1

I am getting the following exception when trying to fetch a text column from a table in PostgreSQL. How to resolve it. Table schema :

enter image description here

Snapshot of data :

enter image description here

Table resultTable = tenv.sqlQuery(
                "SELECT entity_id, CONVERT_FROM(LO_GET(event_payload::oid), 'UTF8') " +
                        "FROM event_log");

Exception stack trace

Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered ":" at line 1, column 52.
Was expecting one of:
    "EXCEPT" ...

Dependencies

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>1.17.2</version>
        </dependency>
        <!-- Required for connecting to JDBC -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>1.16.2</version>
        </dependency>

UPDATE :

I tried changing the query but getting new exception

Table resultTable = tenv.sqlQuery(
                "SELECT entity_id, convert_from(lo_get(cast (event_payload as OID )), 'UTF8') " +
                        "FROM event_log");

Exception

org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 62 to line 1, column 64: Unknown identifier 'OID'
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:187)

3
  • HI. I suggest according to connector doc you should use flink-connector-jdbc version 3.1.2-1.17. Have you tried to select your column without any transformation ? What's you table looks like ?
    – Niko
    Commented Aug 14, 2024 at 7:53
  • @Niko I updated my question with the schema. I updated the version but still getting same error. It works fine without any transformation but since I need to fetch the text column as string, I need to add the transformation in query.
    – Ladu anand
    Commented Aug 14, 2024 at 14:18
  • It seems to me CONVERT_FROM and LO_GET functions is postgre functions and I'm not sure you can use them in Flink SQL. You probably need to use Flink built-in functions or make an UDF. And if you trying to rename your column as OID it should be after function call like SELECT your_function(event_payload ) AS oid FROM ...
    – Niko
    Commented Aug 14, 2024 at 15:34

2 Answers 2

0

It seems to me CONVERT_FROM and LO_GET functions is postgre functions and I'm not sure you can use them in Flink SQL. Look for Flink built-in functions if you need to do any casts or transformations.

Your column having text type in postgre table so according to data-type-mapping it's equivalent to String type in Flink and you don't really need to transform it to string. Just select it.

SELECT event_payload FROM event_log

And if you trying to rename your column as OID should go after function call like:

SELECT some_function(event_payload) AS oid FROM event_log
0

The issue you encountering is related to the face that Flink not directly support Postgresql specific-function and types , like OID or CONVERT_FROM ?

and here is two approaches for reference

1. create a view in database

you can create a view in postgresql that you can use all function you need , just take the eventual data ; view sql like following code snippet , may be modify somewhere as you need;

CREATE VIEW event_log_view AS
SELECT entity_id, convert_from(lo_get(cast(event_payload as OID)), 'UTF8') as event_payload_text
FROM event_log;

and then you can use native sql in flink smoothly

Table resultTable = tenv.sqlQuery("SELECT entity_id, event_payload_text FROM event_log_view");
2. perform conversion in java

I mean you can hard code in java that perform conversion when dataset received

Table resultTable = tenv.sqlQuery("SELECT entity_id, event_payload FROM event_log");

resultTable.map(row -> {
    String entityId = row.getField("entity_id").toString();
    byte[] eventPayload = (byte[]) row.getField("event_payload");
    String eventPayloadText = new String(eventPayload, StandardCharsets.UTF_8); // or perform OID-based conversion if needed
    return Row.of(entityId, eventPayloadText);
});


Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.