Skip to content

Commit 38e8635

Browse files
Coder2jCoder2j
Coder2j
authored and
Coder2j
committed
feature/video3-dagster-job-and-schedule
* added dagster job * added dagster schedule
1 parent c2ed791 commit 38e8635

File tree

2 files changed

+70
-16
lines changed

2 files changed

+70
-16
lines changed

.pre-commit-config.yaml

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,34 @@
11
# See https://pre-commit.com for more information
22
# See https://pre-commit.com/hooks.html for more hooks
33
repos:
4-
- repo: https://github.com/pre-commit/pre-commit-hooks
5-
rev: v3.2.0
4+
- repo: https://github.com/pre-commit/pre-commit-hooks
5+
rev: v4.5.0
66
hooks:
7-
- id: trailing-whitespace
8-
- id: end-of-file-fixer
9-
- id: check-yaml
10-
- id: check-added-large-files
7+
- id: check-added-large-files # prevents giant files from being committed.
8+
- id: check-case-conflict # checks for files that would conflict in case-insensitive filesystems.
9+
- id: check-merge-conflict # checks for files that contain merge conflict strings.
10+
- id: check-yaml # checks yaml files for parseable syntax.
11+
- id: detect-private-key # detects the presence of private keys.
12+
- id: end-of-file-fixer # ensures that a file is either empty, or ends with one newline.
13+
- id: fix-byte-order-marker # removes utf-8 byte order marker.
14+
- id: mixed-line-ending # replaces or checks mixed line ending.
15+
- id: requirements-txt-fixer # sorts entries in requirements.txt.
16+
- id: trailing-whitespace # trims trailing whitespace.
17+
- repo: https://github.com/pre-commit/mirrors-mypy
18+
rev: "v1.7.0" # Use the sha / tag you want to point at
19+
hooks:
20+
- id: mypy
21+
- repo: https://github.com/pre-commit/mirrors-prettier
22+
rev: "v3.1.0" # Use the sha / tag you want to point at
23+
hooks:
24+
- id: prettier
25+
- repo: https://github.com/psf/black
26+
rev: 23.11.0
27+
hooks:
28+
- id: black
29+
args: [--line-length=79]
30+
- repo: https://github.com/asottile/reorder-python-imports
31+
rev: v3.12.0
32+
hooks:
33+
- id: reorder-python-imports
34+
args: [--py38-plus]

hello-dagster.py

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
from typing import List
2-
from dagster import asset, AssetExecutionContext, AssetIn
2+
3+
from dagster import asset
4+
from dagster import AssetExecutionContext
5+
from dagster import AssetIn
6+
from dagster import AssetSelection
7+
from dagster import define_asset_job
8+
from dagster import Definitions
9+
from dagster import ScheduleDefinition
310

411

512
@asset(key="my_awesome_first_asset", group_name="get_started")
@@ -12,7 +19,10 @@ def my_first_asset(context: AssetExecutionContext):
1219
return [1, 2, 3]
1320

1421

15-
@asset(ins={"upstream": AssetIn(key="my_awesome_first_asset")}, group_name="get_started")
22+
@asset(
23+
ins={"upstream": AssetIn(key="my_awesome_first_asset")},
24+
group_name="get_started",
25+
)
1626
def my_second_asset(context: AssetExecutionContext, upstream: List):
1727
"""
1828
This is our second asset
@@ -22,21 +32,41 @@ def my_second_asset(context: AssetExecutionContext, upstream: List):
2232
return data
2333

2434

25-
@asset(ins={
26-
"first_upstream": AssetIn("my_awesome_first_asset"),
27-
"second_upstream": AssetIn("my_second_asset")
28-
}, group_name="get_started")
35+
@asset(
36+
ins={
37+
"first_upstream": AssetIn("my_awesome_first_asset"),
38+
"second_upstream": AssetIn("my_second_asset"),
39+
},
40+
group_name="get_started",
41+
)
2942
def my_third_asset(
30-
context: AssetExecutionContext,
31-
first_upstream: List,
32-
second_upstream: List):
43+
context: AssetExecutionContext, first_upstream: List, second_upstream: List
44+
):
3345
"""
3446
This is our third asset
3547
"""
3648
data = {
3749
"first_asset": first_upstream,
3850
"second_asset": second_upstream,
39-
"third_asset": second_upstream + [7, 8]
51+
"third_asset": second_upstream + [7, 8],
4052
}
4153
context.log.info(f"Output data is: {data}")
4254
return data
55+
56+
57+
defs = Definitions(
58+
assets=[my_first_asset, my_second_asset, my_third_asset],
59+
jobs=[
60+
define_asset_job(
61+
name="hello_dagster_job",
62+
selection=AssetSelection.groups("get_started"),
63+
)
64+
],
65+
schedules=[
66+
ScheduleDefinition(
67+
name="hello_dagster_schedule",
68+
job_name="hello_dagster_job",
69+
cron_schedule="* * * * *",
70+
)
71+
],
72+
)

0 commit comments

Comments
 (0)