45403

How to implement Group Window Function to a “Over Partition By” on Flink SQL?

<h3>Question</h3>

I'm trying to use time windows over Flink SQL, it has been hard for me to get familiar with the framework, but I have already defined my

<ul><li>StreamExecutionEnvironment</li> <li>StreamTableEnvironment</li> <li>FlinkKafkaConsumer</li> </ul>

Then apply query SQL and group by time windows as follows.

<pre class="lang-scala prettyprint-override"> val stream = env.addSource(new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), properties) ) val parsed: DataStream[Order] = stream.map(x=> .... //then I register a DataStream as a table, (Flink Version: 9.3) tEnv.registerDataStream("OrdersB", parsed, 'user, 'product, 'amount, 'proctime.proctime) //grouping by 5-second window val result2 = tEnv.sqlQuery( """ |SELECT user, SUM(amount) |FROM OrdersB |GROUP BY TUMBLE(proctime, INTERVAL '5' SECOND), user |""".stripMargin)

this is going well, but my goal of usql Flink SQL is because I have the need to use windows functions like in oracle or mysql.

The query I'm interested in is this.

<pre class="lang-scala prettyprint-override">val result = tEnv.sqlQuery( """ |SELECT user, product, c_num |FROM ( | SELECT *, | COUNT(user) OVER (PARTITION BY product ORDER BY proctime ASC) as c_num | FROM Orders) |""".stripMargin)

I tried the query with data inside the code and it works

my challenge is to perform this same query for a 5 second time window as in the first example, without having to group. i just want a new column enriched with the value calculated by this function

COUNT(user) OVER (PARTITION BY product ORDER BY proctime ASC) as c_num

my data source is a kafka topic, the following is an example of the individual events user, product, amount

<pre class="lang-scala prettyprint-override">1,"beer",3 1,"beer",1 2,"beer",3 3,"diaper",4 4,"diaper",1 5,"diaper",5 6,"rubber",2

the result should be as follows, considering that the previous events were inserted in a 5-second window, (user, product, c_num)

<pre class="lang-java prettyprint-override">1,"beer",3 1,"beer",3 2,"beer",3 3,"diaper",3 4,"diaper",3 5,"diaper",3 6,"rubber",1

Thank you!

来源:https://stackoverflow.com/questions/61828898/how-to-implement-group-window-function-to-a-over-partition-by-on-flink-sql

Recommend

  • Pythone - pass return from method or class to another class to run in pyinstaller
  • ASP.NET MVC Routing question
  • Django's admin is missing css, images, etc - unable to properly set up static files on shared h
  • SwitchCompat OnCheckedChangeListener called on every orientation change of Fragment
  • Google Maps V3 (PHP/MYSQL with custome infobox)
  • Converting a self subquery to a self join
  • Alternating edit text hint not visible for Arabic language
  • what is mean -(IBAction) behavior function?
  • Working with django : Proxy setup
  • Create One-To-One relationship between table and view in EF4?
  • Insert statement not working using execute(array()) of PDO Extension
  • Implementing type equation generator in OCaml
  • jQuery YQL SELECT FROM rss variable
  • Java 11 and E(fx)clipse JavaFX plugin on Eclipse 4.9: An error has occurred - see the log file
  • Jenkins - could not read Username for 'https://github.com': No such device or address
  • .htaccess redirect domain.com to www.domain.com
  • Synchronous Calls with jqGrid?
  • Classes in Python
  • git clone, upload-pack out of memory
  • Javascript inside HTML import not affecting imported HTML
  • Create an Office365 mailbox from within C# Web API method
  • How to check if a database and tables exist in sql server in a vb .net project?
  • Rotating Towards Path in OpenGL
  • Is there a better way for handling SpatialPolygons that cross the antimeridian (date line)?
  • I am consuming a WCF service that requires headers from a .NET 2 website. How can I programmatically
  • Amazon Elastick BeanStalk error: Failed to create the AWS Elastic Beanstalk application version
  • How to change user identity when git pushing via ssh?
  • What is the best way to cache and reuse immutable singleton objects in Java?
  • Computing the discrete fourier transform of audio data with FFTW
  • Using Service Component Runtime
  • Sql - ON DUPLICATE KEY UPDATE
  • How to use FirstOrDefault inside Include
  • PHP Permalinks.. how to change?
  • media foundation H264 decoder not working properly
  • Running R's aov() mixed effects model from Python using rpy2
  • Creating random wired topology for given arbitrary number of nodes on NS2
  • Access to a Matlab gui from the web
  • convert json to excel in java
  • Create/delete users from text file using Bash script
  • Time Complexity of Fibonacci Algorithm [duplicate]