Для преобразования данных
Преобразователи — это промежуточные шаги графа. Преобразователи получают данные через подключенные входные порты, обрабатывают их указанным пользователем способом и отправляют через подключенные выходные порты в следующие шаги.
Список шагов для преобразования данных:
Gather - собирает записи из разныых потоков в один
Map - пользовательский алгоритм обработки
Dedup - устраняет копии по ключу
DataIntersection - объединяет отсортированные потоки данных, может применять преобразование
Sort
Sort сортирует полученные записи в соответствии с указанным ключом сортировки и копирует каждую из них на все подключенные выходные порты. Позволяет использовать несколько параллельных потоков для сортировки больших данных.
Тип порта | Номер | Обязательный | Описание | Метаданные |
---|---|---|---|---|
Input | 0 | да | Для входящего потока записей | Одинаковые метаданные на входных и выходных портах |
Output | 0 | да | Для отсортированных записей | |
1-n | нет | Для отсортированных записей |
Атрибуты Sort:
Атрибут | Обязательный | Описание | Возможные значения |
---|---|---|---|
sortKey | да | Имена полей и порядок сортировки | sortKey="x_coord(a); y_coord(d)" |
sortInMemory | нет | При sortInMemory="true" выполняется внутренняя сортировка. | sortInMemory="false" (по умолчанию) |
Пример. Сортировка данных.
Входные записи содержат имена файлов и их размер. Нужно отсортировать файлы по размеру, начиная с самого большого (descending – по убыванию). Метаданные содержат поля «FileName», «FileSize».
Входящие записи:
FileName | FileSize |
---|---|
file.txt | 2048 |
file.docx | 1048576 |
file.xml | 65536 |
Решение:
Ключ сортировки: FileSortKey="FileSize(d)"
Исходящие записи:
FileName | FileSize |
---|---|
file.docx | 1048576 |
file.xml | 65536 |
file.txt | 2048 |
Filter
Filter фильтрует входные данные в соответствии с логическим выражением. Отправляет все записи, соответствующие выражению фильтра, в первый выходной порт и все отклоненные записи во второй выходной порт.
Тип порта | Номер | Обязательный | Описание | Метаданные |
---|---|---|---|---|
Input | 0 | да | Для входящего потока записей | Одинаковые метаданные на входных и выходных портах |
Output | 0 | да | Для отфильтрованных записей | |
1 | нет | Для отклонённых записей |
Атрибут | Обязательный | Описание | Возможные значения |
---|---|---|---|
filterExpression | да | Выражение, по которому фильтруются записи. Для записи преобразования используется JavaScript. Возвращает логическое значение. |
|
Пример. Фильтрация данных.
Входные данные содержат данные о продуктах, проданных в прошлом году. Нужно узнать данные только по карандашам. Метаданные содержат поля Product
, Count
и Location
.
Входящие записи:
Product | Count | Location |
---|---|---|
карандаш | 1553 | екатеринбург |
бумага | 6475 | новгород |
ручка | 598 | владикавказ |
карандаш | 177 | омск |
карандаш | 239 | волгоград |
бумага | 19 | казань |
ластик | 53 | ростов |
Решение:
Выражение для фильтрации: $in[0].product == «карандаш»
Исходящие записи:
Product | Count | Location |
---|---|---|
карандаш | 1553 | екатеринбург |
карандаш | 177 | омск |
карандаш | 239 | волгоград |
Gather
Gather собирает записи со всех входящих портов и отправляет в порядке получения на все выходные порты. Порядок получения записей не зависит от порядка входных портов. Этот шаг соблюдает порядок записей в потоках, но не соблюдает порядок потоков. На выходе получается список записей в непредсказуемом порядке. Порядок записей на разных выходах будет одинаков. Шаг не имеет атрибутов.
Тип порта | Номер | Обязательный | Описание | Метаданные |
---|---|---|---|---|
Input | 0 | да | Для входящего потока записей | Одинаковые метаданные на входных и выходных портах |
1-n | нет | Для входящего потока записей | ||
Output | 0 | да | Для отфильтрованных записей | |
1-n | нет | Для отклонённых записей |
Пример. Сбор записей с нскольких входных портов.
Нужно собрать записи с нескольких входов. Метаданные содержат одно поле «id».
Входящие записи:
Решение:
Описание метаданных:
<Global>
<Metadata id="ObjectWithPos">
<Field name="id" type="int"/>
</Metadata>
</Global>
Объявление шага:
<Node id="x" guiX="250" guiY="220" guiName="Gather" type="Gather"/>
Исходящие записи:
Copy
Copy получает записи через один входной порт и копирует каждую из них на все подключенные выходные порты. Шаг не имеет атрибутов.
Тип порта | Номер | Обязательный | Описание | Метаданные |
---|---|---|---|---|
Input | 0 | да | Для входящего потока записей | Любые |
Output | 0 | да | Для скопированных записей | Как у Input 0 |
1-n | нет | Как у Output 0 |
Пример. Копирование данных.
Нужно скопировать записи с метаданными «carID» и «mark» в три потока.
Входящие записи:
порт 0:
carID | mark |
---|---|
145 | mercedes |
856 | toyota |
245 | chevrolet |
Решение:
Для копирования в несколько потоков нужно подключить Copy несколько выходных портов. Записи на всех выходных портах будут идти в одинаковом порядке.
Исходящие записи:
порт 0:
carID | mark |
---|---|
145 | mercedes |
856 | toyota |
245 | chevrolet |
порт 1:
carID | mark |
---|---|
145 | mercedes |
856 | toyota |
245 | chevrolet |
порт 2:
carID | mark |
---|---|
145 | mercedes |
856 | toyota |
245 | chevrolet |
Concat
Concat получает записи, поступившие из первого входного порта, отправляет их на общий выходной порт и добавляет к ним записи, из остальных входных портов. Если шаг имеет более двух входных портов, записи принимаются и отправляются на выход в соответствии с порядком входных портов. Если некоторые входные порты не содержат записей, такие порты пропускается. Шаг не имеет атрибутов.
Тип порта | Номер | Обязательный | Описание | Метаданные |
---|---|---|---|---|
Input | 0 | да | Для входящего потока записей | Любые |
1-n | нет | Как у Input 0 | ||
Output | 0 | да | Для объединенных записей |
Пример. Объединение записей.
Нужно объединить записи. Поданные на вход метаданные имеют поля «flower», «color».
Входящие записи:
порт 0:
flower | color |
---|---|
мак | красный |
ромашка | белый |
василек | голубой |
порт 1:
flower | color |
---|---|
роза | сиреневый |
лилия | розовый |
порт 2:
flower | color |
---|---|
подсолнух | желтый |
анемон | вишневый |
гипсофила | зеленый |
Решение:
После конкатенации будут получены следующие записи.
Исходящие записи:
порт 0:
flower | color |
---|---|
мак | красный |
ромашка | белый |
василек | голубой |
роза | сиреневый |
лилия | розовый |
подсолнух | желтый |
анемон | вишневый |
гипсофила | зеленый |
Map
Map позволяет написать пользовательский алгоритм обработки данных, используя внутренний язык системы. Можно по своему усмотрению трансформировать данные между входным и выходными портами, если предложенных шагов не хватает для выполнения необходимых преобразований данных.
Имеет единственный входной порт и как минимум один выходной. Может отправлять разные записи в разные выходные порты или даже отправлять одну и ту же запись на несколько выходных портов. Работает только с одним элементом, сохраняет порядок записей.
С помощью Map можно:
- удалить ненужные значения полей
- проверить записи с помощью функций или регулярных выражений
- создать новые или изменить существующие поля
- преобразовать типы данных
Тип порта | Номер | Обязательный | Описание | Метаданные |
---|---|---|---|---|
Input | 0 | да | Для входящего потока записей | Любые |
Output | 0 | да | Для преобразованных записей | |
1-n | нет |
Атрибут | Обязательный | Описание | Возможные значения |
---|---|---|---|
transform | Алгоритм преобразования данных | ||
transformURL | 1 | Имя внешнего файла, в котором описано преобразввание | |
charset | нет | Кодировка внешнего файла, определяющего преобразование |
|
Один из атрибутов должен быть указан.
Пример. Обработка данных с помощью Map.
Нужно получить произведение и сумму полученных на вход данных и отправить результаты на разные выходные порты. Входные метаданные содержат поля a, b. Нужно отправить результат перемножения a*b на первый порт, а результат сложения a+b на второй порт.
Входящие записи:
a | b |
---|---|
5 | 6 |
2 | 4 |
1 | 2 |
Решение:
Преобразование:
<Attr name="transform"><![CDATA[
pub fn transform() -> OutPort {
let res_mul = $in[0].a * $in[0].b;
let res_add = $in[0].a + $in[0].b;
$out[0].res_mul = res_mul;
$out[1].res_add = res_add;
return ALL;
}
]]>
</Attr>
Исходящие записи:
порт 0:
multiplied |
---|
30 |
6 |
2 |
порт 1:
added |
---|
11 |
5 |
3 |
Dedup
Dedup удаляет повторяющиеся записи по ключу.
Порты Dedup:
Тип порта | Номер | Обязательный | Описание | Метаданные |
---|---|---|---|---|
Input | 0 | да | Для входных записей | Любые |
Output | 0 | да | Для дедуплицированных записей. | Как у Input 0 |
Output | 1 | нет | Для дубликатов записей. | Как у Input 0 |
Атрибуты Dedup:
Атрибут | Обязательный | Описание | Возможные значения |
---|---|---|---|
dedupKey | нет | Ключ, по которому производится дедупликация (удаление дубликатов) записей. Если ключ не установлен, весь входной поток рассматривается как одна группа и удаляются только полные дубликаты (по всем полям записи). | dedupKey="x_coord" |
sorted | нет | Признак, ортированы ли входные записи.1 | sorted="true" по умолчанию |
dupAmount | нет | Если dedupKey задан, отбирается только указанное в dupAmount количество записей с одинаковыми значениями в полях, указанных в качестве ключа. | dupAmount="1" по умолчанию |
equalNULL | нет | По умолчанию записи с нулевыми значениями ключевых полей считаются равными. Если equalNULL="false", записи считаются разными. | equalNULL="true" по умолчанию |
Dedup может обрабатывать данные в двух режимах: сортированном и несортированном.
Сортированный и несортированный ввод
Если вы хотите обработать огромное количество записей с множеством различных значений ключей, сначала отсортируйте записи, а затем используйте Dedup с отсортированными входными данными.
Если ваши данные содержат несколько разных значений ключей, вы можете использовать несортированный ввод.
Дедуплицирование несортированных входных данных не требует предварительной сортировки, но ограничивается доступной памятью, поскольку записи, которые должны быть отправлены на первый выходной порт, кэшируются. Требования к оперативной памяти в несортированном режиме зависят от значений атрибута dupAmount. Меньшее количество дубликатов означает, что требуется меньше памяти.
Если вы используете несортированные входные записи, порядок выходных записей не обязательно будет таким же, как порядок входных записей.
Пример. Дедупликация сортированных записей.
Записи содержат время входов на некоторый ресурс с различных ip адресов. Записи отсортированы по времени входа. Нужно найти время первого входа для каждого ip адреса. Метаданные содержат поля «ip» и «time».
Входящие записи:
ip | time |
---|---|
67.249.105.118 | 11:46:12 |
208.25.71.88 | 05:14:15 |
161.100.209.235 | 23:12:32 |
161.100.209.235 | 23:19:34 |
67.249.105.118 | 15:34:09 |
223.78.208.184 | 15:35:43 |
52.151.181.4 | 21:51:17 |
223.78.208.184 | 15:38:49 |
161.100.209.235 | 23:28:16 |
Решение:
Ключ дедупликации: dedupKey = «ip»
Исходящие записи:
ip | time |
---|---|
67.249.105.118 | 11:46:12 |
208.25.71.88 | 05:14:15 |
161.100.209.235 | 23:12:32 |
223.78.208.184 | 15:35:43 |
52.151.181.4 | 21:51:17 |
Normalizer
Normalizer создает одну или несколько выходных записей из каждой отдельной входной записи. Входные записи не обязательно сортировать.
Порты Normalizer:
Тип порта | Номер | Обязательный | Описание | Метаданные |
---|---|---|---|---|
Input | 0 | да | Для входных записей | Любые |
Output | 0 | да | Для нормализованных записей | Любые |
Атрибут | Обязательный | Описание | Возможные значения |
---|---|---|---|
normalize | да | Определение способа нормализации записей |
|
charset | нет | Кодировка файла, в котором описано преобразование normalize | charset="UTF-8" |
Для Normalizer нужно определить функцию преобразования. Преобразование должно быть определено в файле задания в артибуте normalize. Эта функция будет вызываться заданное количество раз для каждой записи, поданной на вход этому шагу.
Порядок вызова функций описан ниже.
Алгоритм нормализации записей
Функции шага Normalizer:
Параметр | Значение |
---|---|
Обязательный | Да |
Входные параметры | нет |
Возвращает | Число, которое определяет количество вызовов функции Transform() для каждой записи. Если функция count() возвращает 0, то последующий вызов Transform() не производится. |
Вызов | Вызывается по одному разу для каждой входной записи. |
Описание | Описывает количество повторений вызова функции transform() |
Пример |
|
Параметр | Значение |
---|---|
Обязательный | Да |
Входные параметры | integer idx - целые числа от 0 до count-1 (здесь count — это число, возвращаемое функцией count(). |
Возвращает | Целое число. Число соответствует ![возвращаемому значению преобразования](). |
Вызов | Вызывается один раз для каждой выходной записи. Количество вызовов определяется возвращаемым значением функции count(). |
Описание | Создает выходные записи. |
Пример |
|
Пример. Преобразование записи с многозначными полями в несколько записей.
Входные записи содержат название должности и список имён сотрудников. Нужно преобразовать записи в кортежи, содержащие название должности и одно имя сотрудника.
менеджер | [Егор, Алина]
разработчик | [Артём, Никита, Данил]
Решение
Определим преобразование, используя атрибуты Normalize:
function integer count() {
return length($in.0.users);
}
function integer transform(integer idx) {
$out.0.group = $in.0.group;
$out.0.user = $in.0.users[idx];
return OK;
}
Normalizer вернёт следующие записи:
менеджер |Егор
менеджер |Алина
разработчик|Артём
разработчик|Никита
разработчик|Данил
Заметка: Если преобразование указано во внешнем файле, рекомендуется явно указать исходную кодировку в артибуте
charset
.
DataIntersection
DataIntersection получает отсортированные данные с двух портов, сравнивает значения ключей в обоих и обрабатывает записи следующим образом:
-
Такие входные записи, которые находятся как на входном порте 0, так и на входном порте 1, обрабатываются в соответствии с определяемым пользователем преобразованием, и результат отправляется на выходной порт 1.
-
Такие входные записи, которые находятся только на входном порте 0, отправляются без изменений на выходной порт 0.
-
Такие входные записи, которые есть только на входном порте 1, отправляются без изменений на выходной порт 2.
Записи считаются находящимися на обоих портах, если значения всех полей ключа соединения в них совпадают.
Преобразование должно быть определено, если подключён выходной порт 1.
Тип порта | Номер | Обязательный | Описание | Метаданные |
---|---|---|---|---|
Input | 0 | да | Для входящего потока записей (поток А) |
Любые2 |
1 | да | Для входящего потока записей (поток B) |
Любые2 | |
Output3 | 0 | нет | Для неизмененных входных записей (содержащихся только в потоке A). | как на Input 0 |
1 | нет | Для измененных выходных записей (содержащихся в обоих входных потоках) | Любые | |
2 | нет | Для неизмененных выходных записей (содержащихся только в потоке B). | как на Input 1 |
Часть полей метаданных должна совпадать с полями ключа соединения.
Хотя бы один выходной порт из трех должен быть подключён.
Атрибут | Обязательный | Описание | Возможные значения |
---|---|---|---|
joinKey | да | Ключ соединения | joinKey="$client_plan_gid=$client_plan_gid" |
transform | Функция преобразования данных, определённая в графе | ||
transformURL | Функция преобразования данных, определённая во внешнем файле | ||
charset | Кодировка внешнего файла, содержащего преобразование. | charset="UTF-8" по умолчанию | |
equalNULL | По умолчанию записи с нулевыми значениями ключевых полей считаются равными. Если установлено значение false, они считаются отличными друг от друга. | equalNULL="true" по умолчанию | |
keyDuplicates | Разрешает дублирование ключа. По умолчанию записи с нулевыми значениями ключевых полей считаются равными. Если установлено значение false, они считаются отличными друг от друга. | keyDuplicates="true" по умолчанию |
Должен быть указан хотя бы один из этих атрибутов.
Ключ соединения
Выражается как последовательность отдельных подвыражений, отделенных друг от друга точкой с запятой. Каждое подвыражение представляет собой присвоение имени поля из первого входного порта (с префиксом в виде знака доллара $
) слева и имени поля из второго входного порта (с префиксом $
) с правой стороны.
Заметка: Компонент может возвращать количество записей, отличное от исходного количества входных записей.
Если для параметраkeyDuplicates
установлено значениеfalse
, количество выходных записей может быть меньше количества входных записей, поскольку используется только первая из записей с дубликатом ключа.
Если для параметраkeyDuplicates
установлено значениеtrue
, количество выходных записей может быть больше, чем количество входных записей. На выходе создается декартово произведение записей, имеющих одинаковый ключ.