Для объединения данных

Шаги этой группы называются "Соединители". Они служат для объединения записей из потоков с потенциально разными метаданными в соответствии с заданным ключом соединения и способом преобразования.

Соединители имеют как входные, так и выходные порты. Первый входной порт шага-соединителя называется "главным" или "мастером" и обозначается номером «0», остальные входные порты — "подчинённые".

Соединители всегда объединяют только записи из главного порта с записями из подчинённых портов. И не объединяют записи из ведомых портов между собой.

HashJoin - объединяет потоки данных из двух и более источников по ключу, не требует предварительной сортировки

MergeJoin - объединяет потоки данных из двух и более источников по ключу, требует предварительной сортировки

DBJoin - объединяет по ключу два потока данных, один из которых - база данных, не требует предварительной сортировки

CrossJoin - создает декартово произведение записей из подключенных входных портов, не требует предварительной сортировки

HashJoin

HashJoin объединяет потоки данных по ключу.

Этот шаг получает данные через два или более входных порта, каждый из которых может иметь различную структуру метаданных. Записи не обязательно сортировать перед передачей в этот шаг.

Сначала HashJoin считывает записи из всех подчинённых портов и сохраняет их в хэш-таблицы. Для каждого подчинённого порта создается отдельная хэш-таблица. Размер всех созданных хэш-таблиц не должен превышать размер оперативной памяти сервера, так как хэш-таблицы хранятся в оперативной памяти и ее переполнение приведет к завершению задания с ошибкой. Поэтому имеет смысл в главный входной поток подавать большое количество записей, а в подчинённые потоки – небольшие группы записей.

Затем для каждой записи из мастера производится поиск совпадения с записями из каждой хэш-таблицы по заданному ключу соединения.

Если совпадение найдено, кортеж из записи главного порта и хэш-таблицы подчинённого порта трансформируется заданным образом. Полученная после преобразования запись подаётся на первый выходной порт. Метод преобразования вызывается для каждого кортежа главной и соответствующих подчинённых записей. Записи из главного порта, которые не были объединены подаются на второй выходной порт.

Пример: даны два потока записей. В одном потоке содержится информация о названии продукта в поле «Product» и его цвете на русском языке «rus_color», во втором потоке – названию цвета на русском языке соответствует название на английском «eng_color». Задача сопоставить продукт и его цвет на английском языке.

порт0:

productrus_color
шарфкрасный
носокбелый
свитерзеленый

порт1:

rus_coloreng_color
синийblue
желтыйyellow
красныйred

Ключ соединения: joinKey="$rus_color"

Формула для объединения:


<Attr name="transform">
<![CDATA[//#PseudoRust:code
     pub fn transform() -> OutPort {
        output.out_0.Product = input.in_0.Product;
        output.eng_color = input.in_1.eng_color;
        return out_port![ALL]
    }
]]></Attr>

Исходящие записи:

порт0:

producteng_color
шарфred

порт0:

productrus_color
носокбелый
свитерзеленый
Как у Input 0

Порты HashJoin:

Тип портаНомерОбязательныйОписаниеМетаданные
Input0даДля входящего потока записейЛюбые
1да
2-nнет
Output0даДля исходящего потока записей
1нетДля записей, которые не подошли по ключу соединенияКак у Input 0

Атрибуты HashJoin:

АтрибутОбязательныйОписаниеВозможные значения
joinKeyдаКлюч, по которому объединяются входящие потоки данных

joinKey="$obj_type#$obj_type=$x_type;lvl"

joinTypeнетТип объединения. Бывает "inner"(по умолчанию) и "leftOuter"joinType="leftOuter"
transformда

Преобразование, определенное в файле задания на внутреннем языке системы1


<Attr name="transform"><![CDATA[//#Rust:init
    obj_type:   input_0.obj_type,
    lvl:        input_0.lvl + 1,
    x_coord:    input_0.x_coord,
    y_coord:    input_0.y_coord,
    max_hp:     input_1.max_hp + 15,
    max_mana:   input_1.max_mana,
    exp:input_1.exp * input_1.exp,
    prob:input_2.prob,
]]>
</Attr>

1

В данный момент используется псевдораст, в дальнейшем возможен переход на другой язык описания преобразований.

MergeJoin

Объединяет данные из двух или более источников данных по общему ключу. Данные должны быть отсортированы перед подачей в этот шаг.

MergeJoin получает данные через два или более входных порта, каждый из которых может иметь различную структуру метаданных. Затем объединенные данные отправляются на первый выходной порт. Необъединенные данные можно вывести на второй выходной порт.

Порты MergeJoin:

Тип портаНомерОбязательныйОписаниеМетаданные
Input0даГлавный входной портлюбые
1даВедомый входной порт
2-nнетДополнительные ведомые входные порты
Output0даВыходной порт для объединенных данных
1нетВыходной порт для необъединённых данныхкак на Input 0

Метаданные на первом входе и втором выходе должны быть одинаковыми. (Поля метаданных должны иметь одинаковые типы, имена полей метаданных могут отличаться.)

Атрибуты MergeJoin:

АтрибутОбязательныйОписаниеВозможные значения
joinKeyдаКлюч, по которому объединяются входящие потоки данных. Представляет собой последовательность выражений сопоставления для всех ведомых портов. Выражения сопоставления отделяются друг от друга хешем #. Каждое из выражений сопоставления представляет собой последовательность имен полей из главной и подчинённой записей, разделенных знаком равно. Выражения внутри joinKey отделяются друг от друга точкой с запятойjoinKey="$name(a);$color(d)#$product;$tint;#$name;$tone"
joinTypeнетТип объединенияinner (по умолчанию) | leftOuter
transformдаПреобразование, определенное в файле задания на внутреннем языке системы

<Attr name="transform"><![CDATA[//#PseudoRust:code
    pub fn transform() -> OutPort {
        if !is_full_match {
            output.id = input.in_0.id_id;

        if let Some(in_1) = input.in_1 {
            output.out_0.x = in_1.field_int;
            } else {
                output.out_0.x = 42;
            }

        if let Some(in_2) = input.in_2 {
            output.out_0.y = in_2.field_string;
            } else {
                output.out_0.y = "text Text TEXT".into();
            }

            return out_port![ALL]
            } else {
                return out_port![SKIP]
            }
        }
        pub fn on_error(_: OutPort) { }
    }
]]>
</Attr>

DBJoin

DBJoin получает данные через один входной порт и объединяет их с данными из таблицы базы данных. Эти два источника данных могут иметь разную структуру метаданных. Этот шаг можно использовать в большинстве распространенных ситуаций. Он не требует сортировки входных данных и работает очень быстро, поскольку данные обрабатываются в памяти.

После объединения данные отправляются на первый выходной порт. Второй выходной порт может дополнительно использоваться для вывода несовпадающих записей из основного потока.

Данные, передаваемые через первый входной порт, называются ведущими, а поток данных из базы называется подчинённым. Его данные считаются поступившими через виртуальный дополнительный входной порт. Каждая основная запись сопоставляется с подчинённой записью по одному или нескольким полям, называемым ключом соединения.

Ключ соединения — это последовательность имен полей из основного источника данных, отделенных друг от друга точкой с запятой, двоеточием или вертикальной чертой. Вы можете определить ключ в мастере редактирования ключей. Порядок имен этих полей должен соответствовать порядку ключевых полей таблицы базы данных (и их типам данных). подчинённая часть ключа соединения должна быть определена в атрибуте запроса SQL.

Выходные данные создаются путем применения преобразования, которое сопоставляет объединенные входные данные с выходными.

Главное отличие от других шагов из группы Соединителей в том, что нужно прописывать в атрибутах шага подключение к базе и запрос данных.

Порты DBJoin:

Тип портаНомерОбязательныйОписаниеМетаданные
Input0даГлавный входной портлюбые
1даВедомый входной порт
2-nнетДополнительные ведомые входные порты
Output0даВыходной порт для объединенных данных
1нетВыходной порт для необъединённых данныхкак на Input 0

Атрибуты DBJoin:

АтрибутОбязательныйОписаниеВозможные значения
joinKeyдаКлюч, по которому объединяются входящие потоки данных.joinKey="schemaName;tableName"
leftOuterJoinФлаг для внутреннего объединенияleftOuterJoin="true"
dbConnectionдаИдентификатор соединения с БД, который будет использоваться в качестве ресурса подчинённых записей.dbConnection="JDBC0"
sqlQuery

да

Путь к внешнему файлу, определяющему SQL-запрос.

<attr name="sqlQuery">
    <![CDATA[
        select c.ordinal_position
        from information_schema.tables t
            join information_schema.columns c on c.table_name=t.table_name
                                              and c.table_schema=t.table_schema
        where lower(t.table_schema)=lower(?)
            and lower(t.table_name)=lower(?)
            and lower(c.column_name)=lower(?)
   ]]>
</attr>
transform

да

Функция преобразования
maxCachedМаксимальное количество записей с разными значениями ключей, которые можно сохранить в памяти.maxCached="100"
charsetКодировка файла, в котором хранится алгоритм преобразования.charset="UTF-8" (по умолчанию)

Ключ соединения для DbJoin

$first_name;$last_name

Это поля, которые служат для объединения основных записей с подчинёнными записями. SQL-запрос должен содержать выражение, которое может выглядеть следующим образом:

... where fname=? and lname=?

Значение first_name будет подставлено на место первого знака вопроса в этом условии, а last_name - на место второго. Сначала будет произведён поиск совпадений по joinkey в кеш-памяти, если подходящих записей не обнаружится, то данные будут запрошены из базы данных, иначе - найденная запись сразу отправится для преобразования в функцию transform.

Преобразование

Преобразование (transform) в DBJoin позволяет определить способ сопоставления данных, с помощью которого записи буду отправлены на первый выходной порт. Несоединенные записи из основного потока, отправляемые на второй выходной порт, не могут быть изменены в рамках преобразования DBJoin.

Заметка: Если преобразование указано во внешнем файле, рекомендуется явно указать кодировку файла в артибуте Transform source charset.

CrossJoin

CrossJoin создает декартово произведение записей из подключенных входных портов.

Каждая строка из первого порта соединяется с каждой строкой из последующих портов, в результате получаются все возможные сочетания значений со всех портов. Возможно преобразование данных с помощью атрибута transform.

CrossJoin автоматически передаёт метаданные на выходной порт в соответствии с метаданными на его входных портах.

Заметка: если вы обрабатываете очень большое количество записей, на вашем жёстком диске могут быть созданы временные файлы с обрабатываемыми записями. Это предотвращает чрезмерное использование оперативной памяти.

Порты CrossJoin:

Тип портаНомерОбязательныйОписаниеМетаданные
Input0даГлавный входной портлюбые
1-nнетВедомый входной порт(ы)
Output0даДля выходных записей

Атрибуты CrossJoin:

АтрибутОбязательныйОписаниеВозможные значения
transform

нет2

Функция преобразования данных, определённая в графе

<Attr name="transform">
    <![CDATA[//#PseudoRust:code
        pub fn transform() -> OutPort {
            output.out_0.Product = input.in_0.Product;
            output.eng_color = input.in_1.eng_color;

            return out_port![ALL]
        }
    ]]>
</Attr>
url

нет2

Функция преобразования данных, определённая во внешнем файле
charsetКодировка внешнего файла, содержащего преобразование.по умолчанию UTF-8
2

В случае необходимости описать преобразование, можно указать только один из этих атрибутов.

Пример.

Создать таблицу со всеми возможными сочетаниями игроков в бильярд из двух команд:

Игроки первой команды:

Вася
Маша
Никита

Игроки второй команды:

Алёна
Петя
Лиза

Решение: Вам нужно только подключить источники данных к портам компонента CrossJoin. Настройка атрибутов компонента не требуется.

В результате получится такой набор пар игроков в бильярд:

Вася | Алёна
Вася | Петя
Вася | Лиза
Маша | Алёна
Маша | Петя
Маша | Лиза
Никита | Алёна
Никита | Петя
Никита | Лиза

Заметка: Ребро, по которому передаётся наибольшее количество записей, должно быть подключено к первому входному порте.