مقدمه

خوب فرصتی دست داد تا راجع به تجربه کاری که در مورد Airflow داشتیم، بنویسم. بسیاری از راهنماهای کاربری و آموزش ها صرفا به جنبه های و ویژگی های گوناگون مورد آموزش می پردازند. بعضی وقت ها اگر بدانیم یک تکنولوژی به درد چه کار هایی نمی خورد، به مراتب راه گشاتر از دانستن موارد مناسب بکارگیری آن است. چرا که استفاده از یک تکنولوژی در جای نادرست می تواند منجر به افزایش هزینه تولید محصول شود.

اخیرا در یکی از پروژه ها می بایست یک ETL پیاده سازی کنیم. در حقیقت نیاز به Fetch مجموعه متنوعی داده، دستکاری آن و در نهایت ذخیره سازی در پایگاه داده داشتیم. یکی از کاندیداهای پیاده سازی ETL نرم افزار Airflow بود. چرا؟ چون وقتی کلمات کلیدی نظیر etl python و یا data pipeline python رو جستجو کنید هزار و شونصد تا پست میاد و میگه که میشه با Airflow یک پایپلاین دیتا راه اندازی کرد. برای تیم ما مهم بود تکنولوژی که استفاده می کنیم برای این کار مبتنی بر پایتون باشد (که اشتباه می کردیم، حتی اگر نصفه و نیمه پایتون رو هم پشتیبانی می کرد کافی بود). حتی شما وقتی etl airflow رو هم جستجو کنید به تعداد بسیاری مقاله می رسید که به شما می فهمانند که Airflow برای این کار هست. این طور شد که به دام افتادیم! 🕸

معرفی Airflow

در مستندات Airflow می خوانیم:

نرم افزار Airflow یک پلتفورم است که امکان ساخت، زمانبندی و مانیتور گردش‌های کاری را از طریق برنامه نویسی بدست می دهد.

از Airflow می توان برای نوشتن گردش کار به شکل یک گراف بدون دور جهت دار (DAG) از وظایف استفاده کرد. زمانبند Airflow در حالی که وابستگی های مشخص شده در گراف را دنبال می کند، وظایف شما را در قالب یک آرایه از گردش های کاری اجرا می کند.

به بیان دیگر شما وظایف را در قالب یک تابع به زبان پایتون پیاده سازی می کند، از طرف دیگر با همین زبان وابستگی بین وظایف از نظر اجرا را مشخص می کنید که در نتیجه آن گردش کاری تعریف می شود. در نهایت زمابند Airflow این گردش های کاری را اجرا طبق زمانبندی اجرا می کند. یعنی یک Job Scheduler که وابستگی زمانی اجرای بین Jobها رو هم در نظر میگیره. برای یک ETL که قرار هست یه سری کار پشت سر هم روی دیتا اعمال بشه چی از این بهتر! 😏

این پلتفرم یک رابط کاربری در اختیار شما قرار می دهد تا گردش های کاری را مانیتور کنید، اجرای گردش کاری را متوقف و یا آغاز نمایید و یا اقدام به عیب یابی در صورت بروز خطا کنید. Airflow همچنین اولین پیاده سازی موفق از یک الگوی مفید و انعطاف پذیر گردش کاری به شکل کد (Workflow-as-Code) است.

Airflow GUI

اما نکته مهمی که باید به آن توجه کرد این است که Airflow ابزار خوبی برای طراحی گردش های کاری با حرکت کند و اجرای ایستا در یک زمانبندی ثابت است. یعنی

  1. برای جابجایی های سریع بین وظایف و همچنین وظایف سریع ساخته نشده است.
  2. طراحی نشده تا مدام در فرایند کار گردش کاری را تغییر دهید.

در این مقاله قصد آموزش Airflow رو ندارم؛ تا دلتون بخواد منابع خوب برای یادگیری Airflow روی وب پیدا میشه؛ مستندات خوبی هم داره.

بعد از تجربه تلخ بکارگیری Airflow برای راه اندازی یک ETL، و در نهایت کنار گذاشتن آن، اخیرا به مقاله ای بر خوردم که به طور فنی به این مساله پرداخته است که «چرا Airflow نه؟». برای دانستن این مساله بهتر است به مقاله مراجعه فرمایید. این مقاله نسبتا طولانی است و در این پست بنده بخشی از مقاله را به صورت خلاصه، بخصوص مواردی که ما در پروژه آن را تجربه کرده ایم، برجسته کرده ام.

برای چه کارهایی Airflow مناسب نیست؟

در مقاله یاد شده آمده است:

کاربران غالباً با چپاندن مورد استفاده خود در مدل Airflow خودشون رو به دردسر می اندازند. (ما رو میگه! 😭) برای نمونه از مواردی که Airflow نمی تواند به طور قابل قبولی برآورده کند می توان به موارد زیر اشاره کرد:

۱. پیاده سازی DAG هایی که بخواهیم آن ها رو بدون زمانبندی یا خارج از زمانبندی اجرا کنیم (ما به این مشکل خوردیم. در مورد اجرای یک DAG بر اساس تقاضا – OnDemand DAG)
۲. اجرای DAG ها به صورت موازی با زمان شروع یکسان
۳. DAG های حاوی انشعابات با منطق های پیچیده
۴. DAG های با تعداد زیاد وظایف سریع
۵. DAG هایی که بر روی تبادل داده بین وظایف تکیه دارند (این بزرگترین مشکل ما در جابجا کردن دیتا بین وظایف بود، هم از نظر حجم و هم از نظر نوع)
۶. DAG های پارامتری
۷. DAG های پویا

معمولا شرکت های متوسط-تا-بزرگ برای این که بر موارد فوق فائق بیان در نهایت مجبور میشن یک DSL سفارشی شده بنویسند یا پلاگین های بخصوصی رو پیاده سازی کنند تا این دست نیازمندی ها را رفع کند. در نهایت ارتقاء سیستم سخت میشه و سربار نگهداری به طور چشمگیری افزایش پیدا می کند. این یعنی بهتره دنبال ابزار بهتری باشید کاری که ما در نهایت مجبور به انجام آن شدیم. 😒

در ادامه مقاله به نقایص بخش های مختلف Airflow پرداخته که در اینجا تنها به برخی از موارد تجربه شده توسط خود ما اشاره می شود.

جریان داده (Data Flow)

این یک دام است، لطفا گول نخورید. 🕸

با توجه به آنچه که در مقدمه گفته شده، یکی از متداول ترین موارد استفاده از Airflow ایجاد نوعی خط لوله داده است. این خیلی مسخره است؛ و واقعا من موندم چرا اینقدر این مورد تبلیغ میشه. چون اصلا Airflow از جریان داده به طور خیلی خوب (و یا حتی خوب) پشتیبانی نمی کنه.

یه امکانی به اسم XCom، (چه اسم خفنی) برای تبادل قطعات کوچک متادیتا بین وظایف ارائه کرده است. بعدها از روی بررسی schema دیتابیسی که Airflow داره ازش استفاده می کنه متوجه شدیم این XCom در قالب یک جدول پیاده سازی شده! این یعنی اگر شما نیاز به تبادل داده زیاد بین وظایف داشته باشید، یکی از جدول های پایگاه داده شما مدام در حال نوشته شدن و خوانده شده است. عجب! استفاده از جدول به عنوان فضای اشتراکی بین وظایف!

حالا ما چطور می خواستیم از این امکان برای تبادل دیتا (ی بخصوص حجیم) بین وظایف استفاده کنیم. قاعدتا باید یک فضای ذخیره سازی دیگری می داشتیم تا داده ها را در اون ذخیره کنیم و بهشون ID و یا آدرس بدیم و بجای داده این ID یا آدرس را از طریق XCom بین وظایف جابجا کنیم!!!

معایب این مکانیزم:

  • در یک محیط توزیع شده نیاز به یک فضای ذخیره سازی ابری (اشتراکی) برای همه وظایف است.
  • مجوز نوشتن در پایگاه فراداده Airflow
  • این داده ها زمان انقضاء ندارند، این یعنی خودتون باید مدیریتشون کنید
  • XCom وابستگی upstream/downstream شدیدی بین وظایف ایجاد می کند که Airflow و زمانبند از آن اطلاعی ندارند. اگر کاربر دقت نکند Airflow ممکن است این وظایف را به ترتیب اشتباهی انجام دهد.
def puller(**kwargs):
    ti = kwargs['ti']
    
    # get value_1
    v1 = ti.xcom_pull(key=None, task_ids='push')

مشکل دیگه این که تازه کارها، پایگاه داده خود را به خاطر استفاده بیش از حد از XCom از بین می برند. دیده شده کسی یک داده کوچک (۱۰ گیگابایتی!) ایجاد کرده و با کمک XCom آن را بین چند وظیفه جابجا کرده است. حالا اگر ۱۰ تا وظیفه اینجا داشته باشیم هر اجرا ۱۰۰ گیگابایت داده دائمی را در پایگاه داده فراداده XCom ذخیره می کند!!!

یه تعداد محدودیت دیگه هم در مقاله اشاره شده که در ادامه به چندتا از آن ها به طور خلاصه اشاره کردم.

زمانبندی و زمان

یکی از گیج کننده ترین موارد برای تازه واردها به دنیای Airflow مساله زمانبندی است. مقاله جزئیاتی در مورد این امکان بیان می کند؛ به طور خلاصه اگر بخواهید

  • گردش کاری خود را با یک زمانبندی نامنظم یا بدون زمانبندی اجرا کنید،
  • چندین اجرای همزمان از یک گردش کاری را بخواهید اجرا کنید،
  • گردش کاری را تنها برای اجرای دستی نگهداری کنید. (یکی از مواردی که ما به شدت تو کارمون بهش نیاز داشتیم.)

Airflow ابزار درستی برای کارتان نخواهد بود.

سرویس زمانبند Airflow

ستون فقرات Airflow سرویس زمانبند آن است؛ سرویس زمانبند مسئول موارد زیر است:

  • وارسی پوشه DAG ها هر چند ثانیه برای
  • چک کردن زمانبندی DAG ها و تعیین این که آیا یک DAG آماده اجرا است
  • چک کردن تمام وابستگی وظایف برای تعیین این که آیا وظایف برای اجرا آماده اند.
  • ذخیره آخرین وضعیت یک DAG در پایگاه داده

سرویس زمانبند Airflow از مشکلاتی رنج می برد، که می توان به موارد زیر اشاره کرد:

  • سرویس متمرکز زمانبند Airflow تاخیری بین نقطه زمانی برآورده شدن همه پیش نیاز ها و اجرای وظیفه ایجاد کرده است. بنابراین اگر وظایف شما از نظر اجرا طولانی باشند (این تاخیر قابل چشم پوشی است) همه چی روبراه است اما اگر تعداد زیادی وظیفه کوتاه داشته باشید و یا این که زمان برای شما مهم باشد به راحتی زمانبند به گلوگاه تبدیل می شود. ما به این مشکل بر خوردیم، یه مشت جاب کوچیک داشتیم که داده های ورودی رو دستکاری می کرد و انتظار داشتیم به صورت Real-Time اجرا بشن ولی این اتفاق نیافتاد.
  • سرویس زمانبند Airflow ذاتا متمرکز است؛ این مساله آن را به یک نقطه خرابی واحد برای سیستم تبدیل کرده است.
  • این که هر چند ثانیه پوشه DAG ها مجدد بررسی می شود تا تغییرات DAG ها ثبت شود، می تواند منجر به ناسازگاری شود. (حالتی را در نظر بگیرید زمانبند در حال اجرای وظیفه است، زمانی که می خواهد مجددا از خود نمونه سازی کند، در حالی که متوجه می شود دیگر وجود ندارد!)
  • Airflow از اجرا کننده های متعددی نظیر پردازه های محلی، Celery، Dask و Kubernates برای افزایش بهره وری پشتیبانی می کند، اما خود زمانبند مجددا گلوگاه می شود برای اجرا (با تنظیمات پیش فرض) ۱۰ ثانیه برای هر اجرا زمان صرف می شود (۵ ثانیه برای این که وظیفه به صف اجرا برود، ۵ ثانیه هم برای ارسال برای اجرا!).

گردش کار پارامتری در Airflow

یکی دیگه از مسایلی که ما تو کار باهاش مواجه شدیم این بود که DAG ها باید به صورت ایستا تعریف می شدند و امکان اجرای پارامتری آن وجود ندارد. بنابراین برای انجام یه کار مشخص برای منابع دیتای مختلف، API های گوناگون باید DAG های مختلفی با بدنه یکسان تولید می کردیم.

خیلی خوب میشه اگر گردش کاری داشته باشیم که بتونه به ورودی های مختلف پاسخ بده. یه گردش کار ممکنه مراحلی رو داشته باشه که بشه اون ها رو برای اطلاعاتی که از API ها، دیتابیس ها و یا ID های مختلف میاد تکرار کنه. یعنی یه منطق پردازشی یکسان داشته باشیم برای ورودی های مختلف.

این مدل جزء الگوهای اصلی Airflow نیست؛ با این حال با توجه به این که پوشه DAG ها هر چند ثانیه مجددا چک می شود، می تونیم با استفاده از متغیر ها در Airflow این الگو رو پیاده سازی کنیم.

گردش کار پویا

بعضی وقت ها نیاز هست یه وظیفه مشخص در گردش کار به تعداد نامعلومی تکرار شود. برای مثال فرض کنید وظیفه A یه کوئری به دیتابیس میزنه و فهرست مشتریان جدید رو بازیابی می کنه بعد قرار هست CustomerID بره برای وظیفه دیگه ای که یه سری پردازش روش انجام بشه. تنها گزینه پیاده سازی این سناریو در Airflow پیاده سازی یک وظیفه downstream هست که لیستی از ID ها رو به عنوان ورودی بگیره و با یه حلقه روی اون یه تعداد عملیات انجام بده. مشکلش چیه:

  • رابط کاربری جریان کاری پویا رو نمی فهمه، رو همین حساب مانیتورش هم سخت میشه
  • اگر یکی از رکوردها دچار خطا بشه، کل وظیفه به خطا میخوره
  • عمل «تلاش مجدد» رو باید خودت پیاده سازی کنی و سیستم از عمل شما آگاه نمیشه

تخصیص نسخه به گردش های کاری

امکان تخصیص شماره نسخه به DAG به صورت داخلی در Airflow در نظر گرفته نشده است. اگر بخواهید نسخه های قدیمی DAG خود را داشته باشید باید با نام دیگری آن را ذخیره کنید. از طرف دیگه رابط کاربری اصلا چیزی راجع به نسخه DAG شما نمی دونه. در عمل شما باید DAG هاتون رو توی Git مدیریت کنید و در Airflow از روش های قدیمی تر مثل اضافه کردن شماره نسخه به نام فایل DAG برای مشخص کرده نسخه DAG استفاده کنید. یعنی مدیریت نسخه اینقدر فشل!

جایگزین Airflow برای راه اندازی ETL

ما به جایگزین به مراتب بهتری برای راه اندازی یک خط لوله داده رسیده ایم. این جایگزین ذاتا برای این کار طراحی شده است. ما از Apache NiFi برای دریافت داده، پردازش آن و همچنین ذخیره سازی در پایگاه داده استفاده کردیم.

مزایای استفاده از این ابزار عبارت است از:

  • برعکس Airflow که رابط کاربریش تنها یک بازنمایی از دیتابیس هست، کلیه کارهای NiFi از طریق رابط کاربری انجام میشه؛ در واقع NiFi برای انجام همه کارهاش یک REST API ارائه کرده رابط کاربری روش سوار شده. بنابراین کلیه کارهای مربوط به ساخت گردش های کاری، تنظیمات وظایف و پایش گردش های کاری از طریق رابط کاربری انجام میشه
  • ذاتا برای جریان داده طراحی شده است؛ بین هر دو وظیفه یک صف قرار داده شده؛ بنابراین هر زمان که بخواهید می توانید یک وظیفه را متوقف کنید، بدون آن که نگران از دست رفتن داده باشید. پس از انجام تنظیمات دوباره وظیفه را اجرا و داده های درون صف را پردازش کنید. این یعنی در زمان اجرا شما می تونید جریان کاری رو دستکاری کنید.
  • امکان ساخت وظیفه دلخواه را به صورت بومی با کنترل کامل با Java و به صورت اسکریپتی با امکانات محدود با زبان های دیگه مثل Python براش وجود داره
  • برعکس Airflow برای وظایف سریع و با تاخیر کم طراحی شده
  • NiFi به صورت توزیع شده طراحی شده و پارامترهای کلیدی خود را در پایگاه داده توزیع شده Zookeeper نگهداری می کنه
  • امکان تخصیص نسخه به گردش های کاری کاملا پشتیبانی میشه. اصلا شما یه چیزی به اسم NiFi Registry دارید که وصلش می کنید به Git. با تغییر نسخه با یه آپدیت می تویند جریان کاریتون رو بروز کنید.
  • امکان دنبال کردن تمامی تغییرات دیتا از نقطه ورود تا پایان جریان کاری رو دارید (Data Provenance)

ان شاءالله اگر عمری باقی بود، جزئیات بیشتری در پست های بعدی در مورد نای فای ارائه خواهد شد.

مطالب مرتبط

توازن بار با کمک HAProxy
رابط کاربری خط فرمان سفارشی