Skip to content

Demo: Pull Queries

This demo shows a pull query in action.

CREATE STREAM riderLocations

Use ksql to execute the following CREATE STREAM DDL statement.

CREATE STREAM riderLocations (
    profileId VARCHAR,
    latitude DOUBLE,
    longitude DOUBLE)
  WITH (
    kafka_topic='locations',
    value_format='json',
    partitions=1);

Produce JSON Record

echo '{"profileId":0, "latitude":10.5, "longitude": 20.1}' | kcat -P -b :9092 -t locations

Execute Pull Query

Issue a pull query.

ksql> SELECT * FROM riderLocations;
+------------------------------------------------+------------------------------------------------+------------------------------------------------+
|PROFILEID                                       |LATITUDE                                        |LONGITUDE                                       |
+------------------------------------------------+------------------------------------------------+------------------------------------------------+
|0                                               |10.5                                            |20.1                                            |
Query Completed
Query terminated

Explain Query

ksql> EXPLAIN SELECT * FROM riderLocations;

ID                   : transient_RIDERLOCATIONS_5838976163364029274
Query Type           : PUSH
SQL                  : SELECT * FROM riderLocations;

 Field     | Type
-----------------------------
 PROFILEID | VARCHAR(STRING)
 LATITUDE  | DOUBLE
 LONGITUDE | DOUBLE
-----------------------------

Sources that this query reads from:
-----------------------------------
RIDERLOCATIONS

For source description please run: DESCRIBE [EXTENDED] <SourceId>

Execution plan
--------------
 > [ PROJECT ] | Schema: PROFILEID STRING, LATITUDE DOUBLE, LONGITUDE DOUBLE | Logger: transient_RIDERLOCATIONS_5838976163364029274.Project
   > [ SOURCE ] | Schema: PROFILEID STRING, LATITUDE DOUBLE, LONGITUDE DOUBLE, ROWTIME BIGINT, ROWPARTITION INTEGER, ROWOFFSET BIGINT | Logger: transient_RIDERLOCATIONS_5838976163364029274.KsqlTopic.Source


Processing topology
-------------------
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [locations])
      --> KSTREAM-TRANSFORMVALUES-0000000001
    Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
      --> Project
      <-- KSTREAM-SOURCE-0000000000
    Processor: Project (stores: [])
      --> KSTREAM-PROCESSOR-0000000003
      <-- KSTREAM-TRANSFORMVALUES-0000000001
    Processor: KSTREAM-PROCESSOR-0000000003 (stores: [])
      --> none
      <-- Project

Describe Stream

ksql> DESCRIBE riderlocations;

Name                 : RIDERLOCATIONS
 Field     | Type
-----------------------------
 PROFILEID | VARCHAR(STRING)
 LATITUDE  | DOUBLE
 LONGITUDE | DOUBLE
-----------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
ksql> DESCRIBE riderlocations EXTENDED;

Name                 : RIDERLOCATIONS
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON
Kafka topic          : locations (partitions: 1, replication: 1)
Statement            : CREATE STREAM RIDERLOCATIONS (PROFILEID STRING, LATITUDE DOUBLE, LONGITUDE DOUBLE) WITH (KAFKA_TOPIC='locations', KEY_FORMAT='KAFKA', PARTITIONS=1, VALUE_FORMAT='JSON');

 Field     | Type
-----------------------------
 PROFILEID | VARCHAR(STRING)
 LATITUDE  | DOUBLE
 LONGITUDE | DOUBLE
-----------------------------

Local runtime statistics
------------------------


(Statistics of the local KSQL server interaction with the Kafka topic locations)

Disable Stream Pull Queries

The above steps worked just fine because pull queries on streams are enabled by default (based on ksql.query.pull.stream.enabled).

Let's turn it off and see the result. In the ksql CLI execute the following command:

SET 'ksql.query.pull.stream.enabled' = 'false';

This time executing the following query will inevitably lead to an exception.

SELECT * FROM riderLocations;
Pull queries on streams are disabled. To create a push query on the stream, add EMIT CHANGES to the end. To enable pull queries on streams, set the ksql.query.pull.stream.enabled config to 'true'.
Statement: SELECT * FROM riderLocations;: SELECT * FROM riderLocations;