We extend the previous Helm deployment guide by deploying a more complex configuration of Dagster, which utilizes the CeleryK8sRunLauncher
.
In addition to the previous prerequisites, we expect familiarity with Celery, a distributed task queue system.
Component Name | Type | Image |
---|---|---|
Celery | Deployment | dagster/dagster-celery-k8s (released weekly) |
Daemon | Deployment | dagster/dagster-celery-k8s (released weekly) |
Dagit | Deployment behind a Service | dagster/dagster-celery-k8s (released weekly) |
Database | PostgreSQL | postgres (Optional) |
Flower (Optional) | Deployment behind a Service | mher/flower |
Run Worker | Job | User-provided or dagster/user-code-example (released weekly) |
Step Job | Job | User-provided or dagster/user-code-example (released weekly) |
User Code Deployment | Deployment behind a Service | User-provided or dagster/user-code-example (released weekly) |
Dagster uses Celery to provide step level isolation and to limit the number of concurrent connections to a resource. Users can configure multiple Celery queues (for example, one celery queue for each resource the user would like to limit) and multiple Celery workers per queue via the runLauncher.config.celeryK8sRunLauncher.workerQueues
section of values.yaml
.
The Celery workers poll for new Celery tasks and execute each task in order of receipt or priority. The Celery task largely consists of launching an ephemeral Step Job (Kubernetes Job) to execute that step.
Using Celery requires configuring the CeleryK8sRunLauncher
and celery_k8s_job_executor
.
Building off the prior description, but instead, it is configured with the CeleryK8sRunLauncher
.
Building off the prior description, but instead, it is configured with the CeleryK8sRunLauncher
.
Same as the prior description.
Flower is an optional component that can be useful for monitoring Celery queues and workers.
Building off the prior description, the main difference in this deployment is that the Run Worker submits steps that are ready to be executed to the corresponding Celery queue (instead of executing the step itself). As before, the Run Worker is responsible for traversing the execution plan.
The Step Job is responsible for executing a single step, writing the structured events to the database. The Celery worker polls for the Step Job completion.
Same as the prior description.
We assume that you've followed the initial steps in the previous walkthrough by building your docker image for your user code, pushing it to a registry, adding the Dagster Helm chart repository, and configuring your Helm User Deployment values.
We need to configure persistent object storage so that data can be serialized and passed between steps. To run the Dagster User Code example, create a S3 bucket named "dagster-test".
To enable Dagster to connect to S3, provide AWS_ACCESS_KEY_ID
and AWS_SECRET_ACCESS_KEY
environment variables via the env
, envConfigMaps
, or envSecrets
fields under userDeployments
in values.yaml
or (not recommended) by setting these variables directly in the User Code Deployment image.
Install the Helm chart and create a release. Below, we've named our release dagster-release
. We use helm upgrade --install
to create the release if it does not exist; otherwise, the existing dagster-release
will be modified:
helm upgrade --install dagster-release dagster/dagster -f /path/to/values.yaml \
--set runLauncher.type=CeleryK8sRunLauncher \
--set dagsterDaemon.queuedRunCoordinator.enabled=true \
--set rabbitmq.enabled=true
Helm will launch several pods. You can check the status of the installation with kubectl
. If everything worked correctly, you should see output like the following:
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
dagster-celery-workers-74886cfbfb-m9cbc 1/1 Running 1 3m42s
dagster-daemon-68c4b8d68d-vvpls 1/1 Running 1 3m42s
dagster-dagit-69974dd75b-5m8gg 1/1 Running 0 3m42s
dagster-k8s-example-user-code-1-88764b4f4-25mbd 1/1 Running 0 3m42s
dagster-postgresql-0 1/1 Running 0 3m42s
dagster-rabbitmq-0 1/1 Running 0 3m42s
After Helm has successfully installed all the required kubernetes resources, start port forwarding to the Dagit pod via:
export DAGIT_POD_NAME=$(kubectl get pods --namespace default \
-l "app.kubernetes.io/name=dagster,app.kubernetes.io/instance=dagster,component=dagit" \
-o jsonpath="{.items[0].metadata.name}")
kubectl --namespace default port-forward $DAGIT_POD_NAME 8080:80
Visit http://127.0.0.1:8080, navigate to the playground, select the celery_k8s
preset. Notice how intermediate_storage.s3.config.s3_bucket
is set to dagster-test
. You can replace this string with any other accessible S3 bucket. Then, click Launch Execution.
You can introspect the jobs that were launched with kubectl
:
$ kubectl get jobs
NAME COMPLETIONS DURATION AGE
dagster-job-9f5c92d1216f636e0d33877560818840 1/1 5s 12s
dagster-job-a1063317b9aac91f42ca9eacec551b6f 1/1 12s 34s
dagster-run-fb6822e5-bf43-476f-9e6c-6f9896cf3fb8 1/1 37s 37s
dagster-job-
entries correspond to Step Jobs and dagster-run-
entries correspond to Run Workers.
Within Dagit, you can watch pipeline progress live update and succeed!
We deployed Dagster, configured with the CeleryK8sRunLauncher
, onto a Kubernetes cluster using Helm.