Для записи данных

Шаги для записи данных могут записывать данные в локальные выходные файлы, отправлять их через подключенный дополнительный выходной порт или вставлять в таблицу базы данных.

Шаги для записи — это компоненты графа, которые выполняются последними, поэтому они не имеют выходных портов. Компонент Trash, который отбрасывает все полученные записи, относится к этой группе, поскольку его можно настроить на сохранение записей в файл отладки.

Каждый шаг для записи должен иметь хотя бы один входной порт, через который данные поступают в этот компонент графа.

Шаги для записи могут либо добавлять данные в существующий файл или таблицу базы данных, либо заменять существующее содержимое новым. Для этой цели шаги для записи в файлы, имеют атрибут Append. По умолчанию для этого атрибута установлено значение false. Это означает, что данные необходимо заменить, а не добавить к имеющимся.

Данные можно записать в один и тот же файл или базу данных несколько раз в течение работы графа с помощью нескольих шагов для записи. Для этого разместите шаги для записи в разных фазах графа.

Чтобы увидеть результат работы конкретного шага, щёлкните по нему правой кнопкой мыши и выберите в открывшемся меню опцию «Просмотр данных отладки». Это позволяет просматривать записанные данные используя панель состояний. Панель отображает данные только после того, как граф уже запущен.

FlatFileWriter - запись в файл

DbWriter - запись в базу данных

PgDbWriter - запись в базу данных Postgres с помощью утилиты

Trash - прерывание потока данных

RawWriter - записывает данные в файл в формате OneBridgeFile

Общие свойства шагов для записи

Поддерживаемые форматы URL-адресов для записывающих шагов

Запись в локальные файлы

  • /path/filename.out - записывает указанный файл на диск.

Запись на выходной порт

  • port:$0.FieldName:discrete - если этот URL-адрес используется, выходной порт шага для записи должен быть подключен к другому компоненту. Выходные метаданные должны содержать FieldName с типом string. Каждая запись данных, полученная шагом для записи через входной порт, обрабатывается в соответствии с входными метаданными, отправляется через дополнительный выходной порт и записывается как значение указанного поля метаданных выходного ребра.

Запись в словарь

  • dict:keyName:source - записывает данные в файл с URL-адресом, указанным в словаре. URL-адрес целевого файла извлекается из указанной записи словаря.
  • dict:keyName:discrete - записывает данные в словарь. Создаёт список ArrayList<byte[]>.
  • dict:keyName:stream - Записывает данные в словарь. Создает WritableByteChannel.

Просмотр записанных данных

После создания выходного файла вы можете просмотреть данные в нём с помощью Испектора данных.

Запись на выходной порт

Некоторые устройства записи позволяют передавать данные через дополнительный выходной порт.

Установите атрибут fileURL в формате:

port:$0.FieldName[:processingType]

Здесь ProcessingType является необязательным и может быть установлен в одно из следующих значений:

  • discrete - cодержимое файла сохраняется в поле (одной записи). Данные должны быть достаточно маленькими, чтобы поместиться в это одно поле. Если данные разделены на несколько файлов, отправляется несколько выходных записей. Каждая выходная запись содержит входные данные одного раздела.
  • stream - содержимое файла записывается в поток, который разбивается на фрагменты. Фрагменты записываются в указанное пользователем поле вывода. Один фрагмент относится к одной выходной записи, поэтому ваши данные не обязательно должны помещаться в одно поле данных.

Поток завершается другой записью с нулевым значением в поле (в качестве контрольного сигнала). Если данные разделены на несколько файлов, значение null также служит разделителем между файлами.

Количество выходных записей зависит от значения параметра PortReadingWriting.DATA_LENGTH. Значение по умолчанию — 2048 Б.

Добавление или перезапись

Если целевой файл существует, есть два варианта:

  • существующий файл можно заменить;
  • записи могут быть добавлены к существующему содержимому.

Добавление или замена настраивается с помощью атрибута Append.

  • Если для параметра Append установлено значение true, записи добавляются в файл.
  • Если для параметра Append установлено значение false, файл перезаписывается. Append=false по умолчанию.

Функция Append доступна в следующих шагах для записи: FlatFileWriter, RawWriter, Trash.

Создание каталогов

Если вы указываете несуществующий каталог в URL-адресе файла, установите для атрибута Create directories значение true. Каталог будет создан. В противном случае граф выдаст ошибку.

Значение по умолчанию Create directories=false.

Атрибут Create directories доступен в следующих шагах для записи: FlatFileWriter, RawWriter, Trash.

FlatFileWriter

FlatFileWriter записывает данные в плоские файлы.

Порты FlatFileWriter:

Тип портаНомерОбязательныйОписаниеМетаданные
Input0даДля входящего потока записейЛюбые

Атрибуты FlatFileWriter:

АтрибутОбязательныйОписаниеВозможные значения
fileURLдаПуть к файлу, в который должен быть записан результирующий набор данных${WRITE_DIR}/out.txt
charsetнетКодировка входного файлаcharset="UTF-8" по умолчанию
appendнетЕсли записи печатаются в существующий непустой файл, они по умолчанию заменяют более старые (при append="false"). Если установлено значение "true", новые записи добавляются в конец существующего содержимого выходного файла(ов).append="false" по умолчанию
quotedStrings1нетПри quotedStrings="true" все поля заключаются в кавычки.quotedStrings="true"
quoteChar1нетСимволы, в которые будет заключено значение поля при quotedStrings="true". По умолчанию значение этого атрибута наследуется из метаданных порта ввода 0.quoteChar="&quot;"
fieldDelimiterнетРазделитель полейfieldDelimiter=","
recordDelimiterнетРазделитель записейrecordDelimiter=">/~~/<"
1

По умолчанию значение этого атрибута наследуется из метаданных порта ?ввода? 0.

Пример. Запись данных в файл.

Например, нужно записать обработанные системой данные в файл, используя разделитель «|».

Данные в системе:

datelast_namefirst_name
01.02.2011ГончаровАлексей
29.12.2013НечаевИлья
25.11.2016ВаськинНиколай
23.10.2019СеровГригорий
19.09.2022ГлинкаЕвгений

Данные, записанные шагом FlatFileWriter в файл:

01.02.2011|Горилов|Алексей
29.12.2013|Нечаев|Илья
25.11.2016|Васькин|Николай
23.10.2019|Иванов|Григорий
19.09.2022|Горбунов|Евгений

DbWriter

DbWriter предназначен для выгрузки обработанной информации в базу данных и совершения изменений в базе. Позволяет выполнять несколько SQL-запросов в рамках одной транзакции, для этого выражения в атрибуте sqlQuery разделяются точкой с запятой.

Поддерживает подключение к базам MySQL, Oracle, PostgreSQL.

Подробнее про подключение к базам данных можно прочитать в разделе Соединения с базами данных.

Порты DbWriter:

Тип портаНомерОбязательныйОписаниеМетаданные
Input0даЗаписи для загрузки в базу данныхЛюбые
Output0нетДля отклонённых записейТакие же, как на выходном порте
Output1нетДля возвращённых записейТакие же, как на выходном порте

Атрибуты DbWriter:

АтрибутОбязательныйОписаниеВозможные значения
dbConnection да

Параметры соединения с базой данных. В список параметров для подключения могут входить: database, user, password, host, port.
Параметры можно указать в атрибуте конкретного шага либо в глобальных параметрах графа.

dbConnection="postgres://admin:admin@localhost:5432/dev"  
sqlQuery нет Запрос к базе
            <Attr name="sqlQuery"> <![CDATA[
                INSERT INTO public."Customers" (id, surname, name)
                VALUES (16, 'surname16', 'name16');
                ]]>
            </Attr> 


<attr name="sqlQuery">
    <![CDATA[
        insert into test_table (num,str,test_date) 
        values($num,$str,$test_date) 
        returning str;
    ]]>
</attr>
batchSize нет Количество записей для объединения. Актуально если batchMode="true". batchSize="5"
batchMode нет Определяет режим записи в таблицу. Записывать сразу по несколько записей – true, по одной – false. Пакетный режим ускоряет загрузку данных в базу данных. batchMode="true"
commit нет Определяет, после обработки скольких записей (без ошибок) выполняется коммит (возможные значения -1,0,N). commit="10"
maxErrorCount нет Максимальное количество разрешенных ошибок. При превышении этого числа ошибок граф выходит из строя. По умолчанию ошибки не допускаются. Если установлено значение -1, все ошибки разрешены. maxErrorCount="0"
actionOnError нет Действие при превышении допустимого количества ошибок maxErrorCount. Если установлено значение ROLLBACK, фиксация текущего пакета не выполняется (актуально только для Oracle). Commit для Postgres делает тоже, что и Rollback, MsSql автоматически делает Rollback. actionOnError="commit"

Пример. Загрузка записей из OneBridge в SQLite.

Нужно загрузить данные из OneBridge в базу данных SQLite в таблицу Tracking, в поля client, items, total.

Данные в системе:

customerproductamount_of_purch
JazzveCoffeeCoffea arabica19513
Arabica Legasy LLCCoffea canephora12735
BlackBean GroupExcelsa34010

Решение:

Задайте соединение с базой:


<Connection id="CONN_A" dbURL="${CONN_TYPE}://${DB_01_USR}:${DB_01_PWD}@${DB_01_HOST}:${DB_01_PORT}/${DB_01_DATABASE}"/>

Пропишите в файл задания SQL-запрос:


<Phase ...>
    ...
    <Node id="db_writer" guiX="250" guiY="100" guiName="DatabaseWriter" dbConnection="CONN_A" type="WriterDB">
        <Attr name= “sqlQuery”><![CDATA[
            INSERT INTO public."Tracking" ("client", "items", "total")
            VALUES ($customer, $product, $amount_of_purch)
        ]]></Attr>
    </Node>
    ...
</Phase>

Чтобы вставить значения полей из системы нужно указать название полей из метаданных после знака «$».

Данные будут выгружены в базу данных, соответствующую указанному типу соединения, в таблицу Tracking.

PgDbWriter

PgDbWriter массовый загрузчик, подходящий для загрузки большого количества записей в базу данных PostgreSQL. Считывает данные через входной порт. Использует специальную утилиту Copy, которая позволяет загружать данные очень быстро. Для остальных случаев лучше использовать DbWriter, для которого не требуется использование специальной утилиты.

Порты PgDbWriter:

Тип портаНомерОбязательныйОписаниеМетаданные
Input1-nдаЗаписи для загрузки в базу данныхЛюбые

Пример: необходимо загрузить записи с метаданными «Product» (string), «Amount» (int), «date» (date) и «Price» (float) в таблицу Products в базу данных postgres с именем пользователя user001.

Настройте следующие параметры подключения в файле задания:


<Connection id="CONN_A" dbURL="postgresql://user:password/host:port/database"/>

Данные будут внесены в базу:

PgDbWriter записывает данные в базу PostgreSQL

Атрибуты PgDbWriter

АтрибутОбязательныйОписаниеВозможные значения
dbConnectionдаИдентификатор соединения с базой данных`dbConnection="CONN_A"`
dbURLнетПараметры соединения с базой данных. В список параметров для подключения могут входить: database, user, password, host, port

<Connection id="CONN_A" dbURL="${PG_CONN}://${DB_01_USR}:${DB_01_PWD}@${DB_01_URL}:${DB_01_PORT}/${DB_01_DATABASE}"/>

dbTableнетНазвание таблицы, в которую будет производится запись.`dbTable="test_date_02"`
columnsнетПеречисление конкретных столбцов таблицы, в которые будет произведена запись (по умолчанию - во все).

columns="id, count, price"

sendFreqнетУказывает количество записей, которые должны быть записаны в базу данных за один раз.

sendFreq="40000"

sendMsgSizeнетРазмер отправляемой записи.

sendMsgSize="20kb" (килобайт) или sendMsgSize="20" (байт)

binaryнетФормат файла, в который производится запись.

binary="true" — в бинарном,
если binary="false" — в CSV.

dencodingнетКодировка, используемая при записи. При binary="true" игнорируется.

encoding="UTF8"

Trash

Trash используется для прерывания потока данных, когда не нужно передавать данные дальше. Шаг не имеет выходных портов и не имеет атрибутов.

Trash прерывает поток данных.

Trash прерывает поток данных

Порты Trash:

Тип портаНомерОбязательныйОписаниеМетаданные
Input1-nнетДля входящего потока записей.Любые

Атрибуты Trash:

АтрибутОбязательныйОписаниеВозможные значения
debugOutputнетПо умолчанию все записи удаляются. Если установлено значение true, все записи записываются в лог на вкладку «Консоль». Этот режим поддерживается при подключении любого количества входных портов.debugOutput="true"

RawWriter

RawWriter записывает обрабатываемые данные во внутренние файлы формата OneBridgeFile.

Порты RawWriter:

Тип портаНомерОбязательныйОписаниеМетаданные
Input0даДля полученных записей данныхЛюбые
Output0нетДля записи на выходной портс типом данных byte/cbyte

Атрибуты RawWriter:

АтрибутОбязательныйОписаниеВозможные значения
fileURLдаАтрибут, указывающий, куда будут записаны полученные данные.fileURL="${DATATMP_DIR}/bl/${trgFilePath}/loadHub/${tableName}.sql"
appendПо умолчанию новые записи перезаписывают старые. Если установлено значение true, новые записи добавляются к старым записям, хранящимся в выходном файле(ах). append="false" по умолчанию append="true"

Пример. Запись данных во внутренний формат.

Чтобы записать данные в файл ${DATAOUT_DIR}/my-file.obf нужно заполнить атрибут fileURL.

АтрибутЗначение
fileURL${DATAOUT_DIR}/my-file.obf

RawWriter запишет данные в указанный файл.

Пример. Добавление к существующему файлу.

Добавить записи каждого запуска графа в существующий файл ${DATAOUT_DIR}/my-file.obf. Для этого нужно заполнить атрибут fileURL и append.

АтрибутЗначение
fileURL${DATAOUT_DIR}/my-file.obf
appendtrue