Apache Airflow 數據編排實戰 Data Pipelines with Apache Airflow

[荷] 巴斯·哈倫斯拉克(Bas Harenslak),朱利安·德·瑞特(Julian de Ruiter)著 殷海英 譯

  • Apache Airflow 數據編排實戰-preview-1
  • Apache Airflow 數據編排實戰-preview-2
  • Apache Airflow 數據編排實戰-preview-3
Apache Airflow 數據編排實戰-preview-1

買這商品的人也買了...

商品描述

數據管道通過整合、清理、分析、可視化等方式來管理初始收集的數據流。Apache Airflow提供了一個統一的平臺,可以使用它設計、實施、監控和維護數據的流動。Airflow具有易於使用的UI、即插即用的選項以及靈活的Python腳本,這些都使Airflow能夠非常輕松地完成任何數據管理任務。   在《Apache Airflow 數據編排實戰》中,介紹瞭如何構建和維護有效的數據管道。與你一同探索最常見的使用模式,包括聚合多個數據源、連接到數據湖以及雲端部署。可以將本書作為Airflow的實用指南,本書涵蓋了為Airflow提供動力的有向無環圖(DAG)的各方面知識,以及如何根據工作需求對其進行自定義的技術。 主要內容 ● 構建、測試及部署Airflow管道作為DAG ● 自動對數據進行移動和轉換 ● 使用回填技術分析歷史數據集 ● 開發自定義組件 ● 在生產環境中搭建Airflow

目錄大綱

目    錄

 

第Ⅰ部分  入門

第1章  遇見Apache Airflow 3

1.1  數據管道介紹 3

1.1.1  數據管道的圖形表示 4

1.1.2  運行管道圖 5

1.1.3  管道圖與順序腳本 6

1.1.4  使用工作流管理器運行數據流 8

1.2  Airflow介紹 9

1.2.1  通過Python代碼靈活定義數據管道 9

1.2.2  調度並執行數據管道 10

1.2.3  監控和處理故障 11

1.2.4  增量載入和回填 14

1.3  何時使用Airflow 14

1.3.1  選擇Airflow的原因 14

1.3.2  不使用Airflow的理由 15

1.4  本書的其餘部分 15

1.5  本章小結 16

第2章  Airflow DAG深度解析 17

2.1  從大量數據源中收集數據 17

2.2  編寫你的第一個Airflow DAG 19

2.2.1  任務與operator 22

2.2.2  運行任意Python代碼 23

2.3  在Airflow中運行DAG 25

2.3.1  在Python環境中運行Airflow 25

2.3.2  在Docker容器中運行Airflow 26

2.3.3  使用Airflow圖形界面 27

2.4  運行定時任務 31

2.5  處理失敗的任務 32

2.6  本章小結 34

第3章  Airflow中的調度 35

3.1  示例:處理用戶事件 35

3.2  定期執行DAG 37

3.2.1  使用調度器計劃性運行 37

3.2.2  基於cron的時間間隔 38

3.2.3  基於頻率的時間間隔 40

3.3  增量處理數據 40

3.3.1  獲取增量事件數據 40

3.3.2  使用執行日期的動態時間參考 42

3.3.3  對數據執行分區 43

3.4  理解Airflow的執行日期 45

3.5  使用回填技術填補過去的空白 47

3.6  任務設計的最佳實踐 49

3.6.1  原子性 49

3.6.2  冪等性 51

3.7  本章小結 52

第4章  使用Airflow context對任務進行模板化 53

4.1  為Airflow準備數據 53

4.2  任務context和Jinja模板 55

4.2.1  對operator使用參數模板 56

4.2.2  模板中可用的變量及表達式 57

4.2.3  對PythonOperator使用模板 60

4.2.4  為PythonOperator提供變量 64

4.2.5  檢查模板化參數 66

4.3  連接到其他系統 67

4.4  本章小結 74

第5章  定義任務之間的依賴關系 75

5.1  基本依賴關系 75

5.1.1  線性依賴關系 75

5.1.2  扇入/扇出依賴 77

5.2  分支 79

5.2.1  在任務內部執行分支操作 79

5.2.2  在DAG中使用分支技術 81

5.3  帶有條件的任務 85

5.3.1  在任務內部使用條件 85

5.3.2  對DAG使用條件 86

5.3.3  使用內置operator 88

5.4  觸發條件詳解 88

5.4.1  什麽是觸發規則 88

5.4.2  失敗的影響 89

5.4.3  其他觸發規則 90

5.5  在任務之間共享數據 91

5.5.1  使用XCom共享數據 91

5.5.2  XCom的適用場景 94

5.5.3  使用自定義XCom後端存儲 95

5.6  使用Taskflow API連接Python任務 95

5.6.1  使用Taskflow API簡化Python任務 96

5.6.2  Taskflow API的適用場景 98

5.7  本章小結 99

第Ⅱ部分  Airflow深入學習

第6章  觸發工作流 103

6.1  帶有傳感器的輪詢條件 103

6.1.1  輪詢自定義條件 106

6.1.2  傳感器的異常情況 107

6.2  觸發其他DAG 110

6.2.1  使用TriggerDagRunOperator執行回填操作 114

6.2.2  輪詢其他 DAG 的狀態 114

6.3  使用REST/CLI啟動工作流 117

6.4  本章小結 120

第7章  與外部系統通信 121

7.1  連接到雲服務 122

7.1.1  安裝額外的依賴軟件包 122

7.1.2  開發一個機器學習模型 123

7.1.3  在本地開發外部系統程序 128

7.2  在系統之間移動數據 134

7.2.1  實現PostgresToS3Operator 136

7.2.2  將繁重的任務“外包”出去 139

7.3  本章小結 141

第8章  創建自定義組件 143

8.1  從PythonOperator開始 143

8.1.1  模擬電影評分API 144

8.1.2  從API獲取評分數據 146

8.1.3  構建具體的DAG 149

8.2  創建自定義hook 151

8.2.1  設定自定義hook 151

8.2.2  使用MovielensHook構建DAG 156

8.3  構建自定義operator 158

8.3.1  創建自定義operator 158

8.3.2  創建用於獲取評分數據的operator 159

8.4  創建自定義傳感器 162

8.5  將你的組件打包 165

8.5.1  引導Python包 166

8.5.2  安裝你的Python包 168

8.6  本章小結 169

第9章  測試 171

9.1  開始測試 171

9.1.1  所有DAG的完整性測試 172

9.1.2  設置CI/CD管道 177

9.1.3  編寫單元測試 179

9.1.4  pytest項目結構 180

9.1.5  使用磁盤上的文件測試 184

9.2  在測試中使用DAG和任務context 186

9.3  使用測試進行開發 198

9.4  使用Whirl模擬生產環境 201

9.5  創建DTAP環境 201

9.6  本章小結 201

第10章  在容器中運行任務 203

10.1  同時使用多個不同operator

所面臨的挑戰 203

10.1.1  operator接口和實現 204

10.1.2  復雜且相互沖突的依賴關系 204

10.1.3  轉向通用operator 205

10.2  容器 205

10.2.1  什麽是容器 206

10.2.2  運行第一個Docker容器 207

10.2.3  創建Docker映像 207

10.2.4  使用捲持久化數據 209

10.3  容器與Airflow 212

10.3.1  容器中的任務 212

10.3.2  為什麽使用容器 212

10.4  在Docker中運行任務 213

10.4.1 使用DockerOperator 213

10.4.2  為任務創建容器映像 215

10.4.3  使用Docker任務創建DAG 218

10.4.4  基於Docker的工作流 220

10.5  在Kubernetes中運行任務 221

10.5.1  Kubernetes介紹 221

10.5.2  設置Kubernetes 222

10.5.3  使用KubernetesPodOperator 225

10.5.4  診斷Kubernetes相關的問題 228

10.5.5  與基於docker的工作流的區別 230

10.6  本章小結 231

第Ⅲ部分  Airflow實踐

第11章  最佳實現 235

11.1  編寫清晰的DAG 235

11.1.1  使用風格約定 235

11.1.2  集中管理憑證 239

11.1.3  統一指定配置詳細信息 240

11.1.4  避免在DAG定義中計算 242

11.1.5  使用工廠函數生成通用模式 244

11.1.6  使用任務組對相關任務進行分組 247

11.1.7  為重大變更創建新的DAG 248

11.2  設計可重用的任務 249

11.2.1  要求任務始終滿足冪等性 249

11.2.2  任務結果的確定性 249

11.2.3  使用函數式範式設計任務 250

11.3  高效處理數據 250

11.3.1  限制處理的數據量 250

11.3.2  增量載入與增量處理 252

11.3.3  緩存中間數據 252

11.3.4  不要將數據存儲在本地文件系統 253

11.3.5  將工作卸載到外部系統或源系統 253

11.4  管理資源 254

11.4.1  使用資源池管理並發 254

11.4.2  使用SLA和告警來檢測長時間運行的任務 255

11.5  本章小結 256

第12章  在生產環境中使用Airflow 257

12.1  Airflow架構 258

12.1.1  挑選適合的執行器 259

12.1.2  為Airflow配置metastore 259

12.1.3  深入瞭解調度器 261

12.2  安裝每個執行器 265

12.2.1  設置SequentialExecutor 266

12.2.2  設置LocalExecutor 266

12.2.3  設置CeleryExecutor 267

12.2.4  設置KubernetesExecutor 269

12.3  捕獲所有Airflow進程的日誌 276

12.3.1  捕獲Web服務器輸出 276

12.3.2  捕獲調度器輸出 277

12.3.3  捕獲任務日誌 278

12.3.4  將日誌發送到遠程存儲 278

12.4  可視化及監控Airflow指標 279

12.4.1  從Airflow收集指標 279

12.4.2  配置Airflow以發送指標 280

12.4.3  配置Prometheus以收集指標 281

12.4.4  使用Grafana創建儀表板 283

12.4.5  應監控的指標 285

12.5  如何獲得失敗任務的通知 287

12.5.1  DAG和operator內的告警 287

12.5.2  定義服務級別協議(SLA) 289

12.6  可伸縮性與性能 290

12.6.1  控制最大運行任務數 290

12.6.2  系統性能配置 292

12.6.3  運行多個調度器 292

12.7  本章小結 293

第13章  Airflow安全性 295

13.1  保護Airflow Web界面 296

13.1.1  將用戶添加到RBAC界面 296

13.1.2  配置RBAC界面 299

13.2  加密靜態數據 300

13.3  連接LDAP服務 301

13.3.1  理解LDAP 302

13.3.2  從LDAP服務獲取用戶 304

13.4  加密與Web服務器的通信 305

13.4.1  瞭解HTTPS 305

13.4.2  為HTTPS配置證書 307

13.5  從認證管理系統獲取憑證 311

13.6  本章小結 314

第14章  實戰:探索游覽紐約市的最快方式 315

14.1  理解數據 318

14.1.1  Yellow Cab文件共享 318

14.1.2  Citi Bike REST API 319

14.1.3  確定算法 320

14.2  提取數據 320

14.2.1  下載Citi Bike數據 321

14.2.2  下載Yellow Cab數據 323

14.3  對數據應用類似的轉換 325

14.4  構建數據管道 330

14.5  開發冪等的數據管道 331

14.6  本章小結 333

第Ⅳ部分  在雲端

第15章  Airflow在雲端 337

15.1  設計雲端部署策略 337

15.2  雲端專用的hook和operator 339

15.3  托管服務 340

15.3.1  Astronomer.io 340

15.3.2  Google Cloud Composer 340

15.3.3  適用於Apache Airflow的

Amazon托管工作流 341

15.4  選擇部署策略 342

15.5  本章小結 342

第16章  在AWS中運行Airflow 345

16.1  在AWS中部署Airflow 345

16.1.1  選擇雲服務 345

16.1.2  設計網絡 347

16.1.3  添加DAG同步 347

16.1.4  使用CeleryExecutor擴展 348

16.1.5  後續步驟 349

16.2  針對AWS的hook和operator 350

16.3  用例:使用AWS Athena進行無服務器的電影排名 351

16.3.1  用例概要 352

16.3.2  設置資源 352

16.3.3  創建DAG 355

16.3.4  環境清理 360

16.4  本章小結 361

第17章  在Azure中使用Airflow 363

17.1  在Azure中部署Airflow 363

17.1.1  選擇服務 363

17.1.2  設計網絡 364

17.1.3  使用CeleryExecutor擴展 365

17.1.4  後續步驟 366

17.2  針對Azure設計的hook和operator 367

17.3  示例:在Azure上運行無服務器的電影推薦程序 367

17.3.1  示例概要 368

17.3.2  設定資源 368

17.3.3  創建DAG 372

17.3.4  環境清理 377

17.4  本章小結 378

第18章  在GCP中運行Airflow 379

18.1  在GCP中部署Airflow 379

18.1.1  選擇服務 379

18.1.2  使用Helm在GKE上

部署Airflow 381

18.1.3  與Google服務集成 383

18.1.4  設計網絡 385

18.1.5  通過CeleryExecutor擴展 386

18.2  針對GCP的hook和operator 388

18.3  用例:在GCP上運行無服務器的電影評級 392

18.3.1  上傳到GCS 392

18.3.2  將數據導入BigQuery 394

18.3.3  提取最高評分 396

18.4  本章小結 399

附錄A  運行示例代碼 401

附錄B  Airflow 1和Airflow 2中的包結構 405

附錄C  Prometheus指標映射 409