From c1a133d2f9073ec8c589f634957473d844cd9bef Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Fri, 18 Jul 2025 18:23:38 +0200 Subject: [PATCH 1/3] improve resource/topic wait script --- .../create-spark-ingestion-job.yaml | 83 ++++++++++++++++++- 1 file changed, 82 insertions(+), 1 deletion(-) diff --git a/demos/data-lakehouse-iceberg-trino-spark/create-spark-ingestion-job.yaml b/demos/data-lakehouse-iceberg-trino-spark/create-spark-ingestion-job.yaml index 10271f25..6dfdc5e6 100644 --- a/demos/data-lakehouse-iceberg-trino-spark/create-spark-ingestion-job.yaml +++ b/demos/data-lakehouse-iceberg-trino-spark/create-spark-ingestion-job.yaml @@ -11,7 +11,7 @@ spec: spec: serviceAccountName: demo-serviceaccount initContainers: - - name: wait-for-kafka + - name: wait-for-resources image: oci.stackable.tech/sdp/tools:1.0.0-stackable0.0.0-dev command: - bash @@ -23,6 +23,68 @@ spec: kubectl wait --for=condition=ready --timeout=30m pod -l app=minio,release=minio,stackable.tech/vendor=Stackable echo 'Waiting for all kafka brokers to be ready' kubectl wait --for=condition=ready --timeout=30m pod -l app.kubernetes.io/name=kafka,app.kubernetes.io/instance=kafka + echo 'Waiting for all nifi instances to be ready' + kubectl wait --for=condition=ready --timeout=30m pod -l app.kubernetes.io/name=nifi,app.kubernetes.io/instance=nifi + - name: wait-for-kafka-topics + image: oci.stackable.tech/sdp/kafka:3.9.1-stackable0.0.0-dev + command: + - bash + - -euo + - pipefail + - -c + - | + # Configurable + BROKER="${BROKER:-kafka-broker-default-0-listener-broker:9093}" + + log() { + level="$1" + shift + echo "[$level] $*" + } + + check_leaders() { + local topic=$1 + local failed=0 + + log INFO "Starting leader check on Kafka broker: $BROKER for topic: $topic" + metadata=$(kcat -b "$BROKER" -X security.protocol=SSL -X ssl.key.location=/stackable/tls-kcat/tls.key -X ssl.certificate.location=/stackable/tls-kcat/tls.crt -X ssl.ca.location=/stackable/tls-kcat/ca.crt -L -t "$topic" 2>/dev/null) + + if [[ -z "$metadata" ]]; then + log ERROR "Failed to retrieve metadata for topic: $topic" + return 1 + fi + + log DEBUG "Metadata for $topic:" + echo "$metadata" + + if echo "$metadata" | grep -q 'leader: -1'; then + log ERROR "Found 'leader: -1' in topic '$topic'; topic not ready yet!" + return 1 + fi + + if echo "$metadata" | grep -q 'Broker: Leader not available'; then + log ERROR "Topic '$topic' not available yet" + return 1 + fi + + log INFO "Check topic '$topic' was successful" + return 0 + } + + for topic in "shared_bikes_bike_status" "shared_bikes_station_status" "shared_bikes_station_information" "water_levels_measurements" "water_levels_stations" + do + result=$(check_leaders "$topic") + echo "$result" + if [ "$result" == "1" ] + then + exit 1 + fi + done + exit 0 + + volumeMounts: + - name: tls-kcat + mountPath: /stackable/tls-kcat containers: - name: create-spark-ingestion-job image: oci.stackable.tech/sdp/tools:1.0.0-stackable0.0.0-dev @@ -40,7 +102,26 @@ spec: - name: manifest configMap: name: create-spark-ingestion-job-manifest + - name: tls-kcat + ephemeral: + volumeClaimTemplate: + metadata: + annotations: + secrets.stackable.tech/backend.autotls.cert.lifetime: "1d" + secrets.stackable.tech/class: "tls" + secrets.stackable.tech/format: "tls-pem" + secrets.stackable.tech/scope: "pod" + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: "1" + storageClassName: secrets.stackable.tech + volumeMode: Filesystem restartPolicy: OnFailure + securityContext: + fsGroup: 1000 backoffLimit: 50 --- apiVersion: v1 From 03c3f428c56290172ab3000bc414ad6150453517 Mon Sep 17 00:00:00 2001 From: Malte Sander Date: Mon, 21 Jul 2025 08:59:34 +0200 Subject: [PATCH 2/3] remove trailing whitespace --- .../create-spark-ingestion-job.yaml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/demos/data-lakehouse-iceberg-trino-spark/create-spark-ingestion-job.yaml b/demos/data-lakehouse-iceberg-trino-spark/create-spark-ingestion-job.yaml index 6dfdc5e6..4b7dc192 100644 --- a/demos/data-lakehouse-iceberg-trino-spark/create-spark-ingestion-job.yaml +++ b/demos/data-lakehouse-iceberg-trino-spark/create-spark-ingestion-job.yaml @@ -75,13 +75,12 @@ spec: do result=$(check_leaders "$topic") echo "$result" - if [ "$result" == "1" ] + if [ "$result" == "1" ] then exit 1 fi done exit 0 - volumeMounts: - name: tls-kcat mountPath: /stackable/tls-kcat From 8b031750c8433cfd41f0f13ec7d59aee62bdfccf Mon Sep 17 00:00:00 2001 From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> Date: Mon, 21 Jul 2025 10:09:16 +0200 Subject: [PATCH 3/3] add comment --- .../create-spark-ingestion-job.yaml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/demos/data-lakehouse-iceberg-trino-spark/create-spark-ingestion-job.yaml b/demos/data-lakehouse-iceberg-trino-spark/create-spark-ingestion-job.yaml index 4b7dc192..c35a1ad8 100644 --- a/demos/data-lakehouse-iceberg-trino-spark/create-spark-ingestion-job.yaml +++ b/demos/data-lakehouse-iceberg-trino-spark/create-spark-ingestion-job.yaml @@ -33,7 +33,13 @@ spec: - pipefail - -c - | - # Configurable + #!/usr/bin/bash + # + # Wait for Kafka topics created by the Nifi workflows to be ready. + # Also wait for all topic partitions to have a leader. + # This is required for the Spark streaming job to be able to read from the topics. + # Without this check, the Spark job might hang indefinitely without processing any events. + BROKER="${BROKER:-kafka-broker-default-0-listener-broker:9093}" log() {