I tried to transfer data from one table to another in Flink using SQL in BATCH mode and noticed that at first all the data tries to cache in memory before going further along the pipeline. Because I have a lot of data, I get a memory shortage error. Is there a way to read data in chunks rather than all at once?
Example of a stream:
CREATE TABLE t_destination_nrt (
`fk_id` decimal(32, 0) primary key not enforced,
`fv_text` string,
`fd_timestamp` timestamp,
`fv_desc` string
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://192.168.1.17:5432/postgres',
'table-name' = 'nrt.t_destination',
'username' = 'postgres',
'password' = 'postgres',
'scan.fetch-size' = '1000',
'lookup.cache.ttl' = '60s',
'lookup.cache.max-rows' = '10000',
'lookup.max-retries' = '3',
'sink.buffer-flush.max-rows' = '5000',
'sink.buffer-flush.interval' = '2s',
'sink.max-retries' = '3'
)
CREATE TABLE t_source_nrt (
`fk_id` decimal(32, 0) primary key not enforced,
`fv_text` string,
`fd_timestamp` timestamp
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://192.168.1.17:5432/postgres',
'table-name' = 'nrt.t_source',
'username' = 'postgres',
'password' = 'postgres',
'scan.fetch-size' = '1000',
'lookup.cache.ttl' = '60s',
'lookup.cache.max-rows' = '10000',
'lookup.max-retries' = '3',
'sink.buffer-flush.max-rows' = '5000',
'sink.buffer-flush.interval' = '2s',
'sink.max-retries' = '3'
)
CREATE TABLE t_dict_nrt (
`fk_id` decimal(32, 0) primary key not enforced,
`fv_desc` string
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://192.168.1.17:5432/postgres',
'table-name' = 'nrt.t_dict',
'username' = 'postgres',
'password' = 'postgres'
)
insert into t_destination_nrt (fk_id, fv_text, fd_timestamp, fv_desc)
select /*+ BROADCAST(dn) */
sn.fk_id,
sn.fv_text,
sn.fd_timestamp,
dn.fv_desc
from t_source_nrt sn
left join t_dict_nrt dn on sn.fk_id % 3 = dn.fk_id
I try reload data using flink sql in batch mode from table to table. I expected the data loading by chunk, but flink performs full caching of the source table before insertion into destination.

