Skip to content

Demo: CREATE TABLE AS SELECT

This demo shows the internals of CREATE TABLE AS SELECT (CTAS).

CREATE (OR REPLACE)? TABLE (IF NOT EXISTS)? sourceName
  (WITH tableProperties)?
  AS query

CREATE TABLE AS SELECT DDL command is parsed by AstBuilder into CreateTableAsSelect.

CREATE TABLE AS SELECT is a persistent query.

Create Stream

CREATE STREAM product_updates (
  productId VARCHAR,
  price DOUBLE)
WITH (
  kafka_topic='product_updates',
  value_format='json',
  partitions=1);
 Message
----------------
 Stream created
----------------

Create Table As Select

CREATE TABLE product_prices AS
SELECT
    productId,
    LATEST_BY_OFFSET(price) AS price
FROM product_updates
GROUP BY productId;
 Message
---------------------------------------------
 Created query with ID CTAS_PRODUCT_PRICES_3
---------------------------------------------
ksql> LIST QUERIES;

 Query ID              | Query Type | Status    | Sink Name      | Sink Kafka Topic | Query String
-------------------------------------------------------------------------------------------------------------------------------------------------------
 CTAS_PRODUCT_PRICES_3 | PERSISTENT | RUNNING:1 | PRODUCT_PRICES | PRODUCT_PRICES   | CREATE TABLE PRODUCT_PRICES WITH (KAFKA_TOPIC='PRODUCT_PRICES', PARTITIONS=1, REPLICAS=1) AS SELECT   PRODUCT_UPDATES.PRODUCTID PRODUCTID,   LATEST_BY_OFFSET(PRODUCT_UPDATES.PRICE) PRICE FROM PRODUCT_UPDATES PRODUCT_UPDATES GROUP BY PRODUCT_UPDATES.PRODUCTID EMIT CHANGES;
-------------------------------------------------------------------------------------------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;

List Tables

ksql> list tables;

 Table Name     | Kafka Topic    | Key Format | Value Format | Windowed
------------------------------------------------------------------------
 PRODUCT_PRICES | PRODUCT_PRICES | KAFKA      | JSON         | false
------------------------------------------------------------------------

Send Price Updates

In another terminal, execute the following commands:

echo '1:{"productId": "p1", "price": 20.0}' | kcat -P -b :9092 -t product_updates -K :
echo '2:{"productId": "p2", "price": 10.5}' | kcat -P -b :9092 -t product_updates -K :

List Prices

SELECT * FROM product_prices;
+-------------------------------------------------------------------------+-------------------------------------------------------------------------+
|PRODUCTID                                                                |PRICE                                                                    |
+-------------------------------------------------------------------------+-------------------------------------------------------------------------+
|p1                                                                       |20.0                                                                     |
|p2                                                                       |10.5                                                                     |
Query terminated

Emit Price Changes

With EMIT CHANGES you will receive all price changes per product.

SELECT * FROM product_prices EMIT CHANGES;
+-------------------------------------------------------------------------+-------------------------------------------------------------------------+
|PRODUCTID                                                                |PRICE                                                                    |
+-------------------------------------------------------------------------+-------------------------------------------------------------------------+
|p1                                                                       |20.0                                                                     |
|p2                                                                       |10.5                                                                     |
echo '1:{"productId": "p1", "price": 50.0}' | kcat -P -b :9092 -t product_updates -K :

Notice a new entry at the end for the product p1.

ksql> SELECT * FROM product_prices EMIT CHANGES;
+-------------------------------------------------------------------------+-------------------------------------------------------------------------+
|PRODUCTID                                                                |PRICE                                                                    |
+-------------------------------------------------------------------------+-------------------------------------------------------------------------+
|p1                                                                       |20.0                                                                     |
|p2                                                                       |10.5                                                                     |
|p1                                                                       |50.0                                                                     |

Explain Persistent Query

EXPLAIN CTAS_PRODUCT_PRICES_3;
ksql> EXPLAIN CTAS_PRODUCT_PRICES_3;

ID                   : CTAS_PRODUCT_PRICES_3
Query Type           : PERSISTENT
SQL                  : CREATE TABLE PRODUCT_PRICES WITH (KAFKA_TOPIC='PRODUCT_PRICES', PARTITIONS=1, REPLICAS=1) AS SELECT
  PRODUCT_UPDATES.PRODUCTID PRODUCTID,
  LATEST_BY_OFFSET(PRODUCT_UPDATES.PRICE) PRICE
FROM PRODUCT_UPDATES PRODUCT_UPDATES
GROUP BY PRODUCT_UPDATES.PRODUCTID
EMIT CHANGES;
Host Query Status    : {localhost:8088=RUNNING}

 Field     | Type
------------------------------------
 PRODUCTID | VARCHAR(STRING)  (key)
 PRICE     | DOUBLE
------------------------------------

Sources that this query reads from:
-----------------------------------
PRODUCT_UPDATES

For source description please run: DESCRIBE [EXTENDED] <SourceId>

Sinks that this query writes to:
-----------------------------------
PRODUCT_PRICES

For sink description please run: DESCRIBE [EXTENDED] <SinkId>

Execution plan
--------------
 > [ SINK ] | Schema: PRODUCTID STRING KEY, PRICE DOUBLE | Logger: CTAS_PRODUCT_PRICES_3.PRODUCT_PRICES
   > [ PROJECT ] | Schema: PRODUCTID STRING KEY, PRICE DOUBLE | Logger: CTAS_PRODUCT_PRICES_3.Aggregate.Project
     > [ AGGREGATE ] | Schema: PRODUCTID STRING KEY, PRODUCTID STRING, PRICE DOUBLE, KSQL_AGG_VARIABLE_0 DOUBLE | Logger: CTAS_PRODUCT_PRICES_3.Aggregate.Aggregate
       > [ GROUP_BY ] | Schema: PRODUCTID STRING KEY, PRODUCTID STRING, PRICE DOUBLE | Logger: CTAS_PRODUCT_PRICES_3.Aggregate.GroupBy
         > [ PROJECT ] | Schema: PRODUCTID STRING, PRICE DOUBLE | Logger: CTAS_PRODUCT_PRICES_3.Aggregate.Prepare
           > [ SOURCE ] | Schema: PRODUCTID STRING, PRICE DOUBLE, ROWTIME BIGINT, ROWPARTITION INTEGER, ROWOFFSET BIGINT | Logger: CTAS_PRODUCT_PRICES_3.KsqlTopic.Source


Processing topology
-------------------
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [product_updates])
      --> KSTREAM-TRANSFORMVALUES-0000000001
    Processor: KSTREAM-TRANSFORMVALUES-0000000001 (stores: [])
      --> Aggregate-Prepare
      <-- KSTREAM-SOURCE-0000000000
    Processor: Aggregate-Prepare (stores: [])
      --> KSTREAM-FILTER-0000000003
      <-- KSTREAM-TRANSFORMVALUES-0000000001
    Processor: KSTREAM-FILTER-0000000003 (stores: [])
      --> Aggregate-GroupBy
      <-- Aggregate-Prepare
    Processor: Aggregate-GroupBy (stores: [])
      --> Aggregate-GroupBy-repartition-filter
      <-- KSTREAM-FILTER-0000000003
    Processor: Aggregate-GroupBy-repartition-filter (stores: [])
      --> Aggregate-GroupBy-repartition-sink
      <-- Aggregate-GroupBy
    Sink: Aggregate-GroupBy-repartition-sink (topic: Aggregate-GroupBy-repartition)
      <-- Aggregate-GroupBy-repartition-filter

  Sub-topology: 1
    Source: Aggregate-GroupBy-repartition-source (topics: [Aggregate-GroupBy-repartition])
      --> KSTREAM-AGGREGATE-0000000005
    Processor: KSTREAM-AGGREGATE-0000000005 (stores: [Aggregate-Aggregate-Materialize])
      --> Aggregate-Aggregate-ToOutputSchema
      <-- Aggregate-GroupBy-repartition-source
    Processor: Aggregate-Aggregate-ToOutputSchema (stores: [])
      --> Aggregate-Project
      <-- KSTREAM-AGGREGATE-0000000005
    Processor: Aggregate-Project (stores: [])
      --> KTABLE-TOSTREAM-0000000011
      <-- Aggregate-Aggregate-ToOutputSchema
    Processor: KTABLE-TOSTREAM-0000000011 (stores: [])
      --> KSTREAM-SINK-0000000012
      <-- Aggregate-Project
    Sink: KSTREAM-SINK-0000000012 (topic: PRODUCT_PRICES)
      <-- KTABLE-TOSTREAM-0000000011