มารู้จักกับ RxJava และ RxAndroid กันเถอะ [ตอนที่ 2]

อยู่ในระหว่างการปรับปรุงเนื้อหา

หลังจากที่เกริ่นคร่าวๆเกี่ยวกับ ReactiveX ไปใน บทความตอนที่แล้ว ผู้ที่หลงเข้ามาอ่านหลายๆคนอาจจะมีคำถามสงสัยมากมายเกี่ยวกับ Rx

คราวนี้ก็มาทำความรู้จักและการใช้งาน RxJava กับ RxAndroid กันต่อครับ

บทความในซีรีย์เดียวกัน

เริ่มต้นใช้งาน RxJava และ Rx Android


เพิ่ม Dependency ของทั้งสองเข้าไปใน build.gradle ดังนี้

compile 'io.reactivex:rxjava:1.1.8' compile 'io.reactivex:rxandroid:1.2.1'


ถึงแม้ว่า RxAndroid จะมี RxJava เป็น Dependency อยู่แล้ว แต่ก็แนะนำให้ประกาศ RxJava ไว้ใน build.gradle ด้วย เพราะในบางครั้ง RxJava มีการอัปเดตเวอร์ชันใหม่ แต่ RxAndroid ก็ไม่ได้อัปเดตตามเสมอไป ดังนั้นจึงต้องเพิ่มทั้งคู่เพื่อให้มั่นใจว่าจะได้ใช้งาน RxJava กับ RxAndroid เป็นเวอร์ชันใหม่สุดทั้งคู่

มาเข้าใจกับคำสั่งง่ายๆก่อน


เริ่มจากตัวอย่างโค้ดรูปแบบง่ายๆก่อนเนอะ

public List<String> getNameList() { List<String> nameList = Arrays.asList("Cupcake", "Donut", "Eclair", "Froyo", "Gingerbread", "Honeycomb", "Ice Cream Sandwich", "Jelly Bean", "Kitkat", "Lollipop", "Marshmallow", "Nugat"); return nameList; } public void doSomething() { Observable.just(getNameList()) .subscribe(new Observer<List<String>>() { @Override public void onCompleted() { Log.d("Rx", "onCompleted"); } @Override public void onError(Throwable e) { Log.e("Rx", "onError"); } @Override public void onNext(List<String> nameList) { Log.d("Rx", "onNext"); } }); }


ให้ดูที่ getNameList() ก่อนนะ อันนี้เป็นคำสั่งง่ายๆที่จะโยนรายชื่อออกมาเป็น String Array โดยเจ้าของบล็อกจะเอาคำสั่งนี้ไปเรียกใช้งานใน Rx อีกที

และ Observable สามารถเรียกใช้ Static Method ที่ชื่อว่า Just ได้ ก็เลยโยนคำสั่ง getNameList() เข้าไปใส่ในนั้น แล้วผลลัพธ์ที่ได้จากคำสั่งก็ getNameList จะไปทำงานที่ onNext, onCompleted หรือ onError (ขึ้นอยู่กับกรณี) ซึ่งทั้ง 3 ตัวนี้เป็น Override Method ของคลาส Observer

โดย Override Method แต่ละตัวมีหน้าที่ต่างกันดังนี้

onNext


เมื่อ Observable ทำงานเสร็จในแต่ละครั้งจะส่งผลลัพธ์มาที่ Method ตัวนี้

onCompleted


เมื่อ Observable ทำงานเสร็จหมดแล้ว Method นี้ก็จะถูกเรียกเพื่อให้รู้ว่า Observable ทำงานเสร็จแล้ว

onError


เมื่อคำสั่งที่ทำงานเกิดปัญหาใดๆก็ตาม จะเข้ามาที่ Method ตัวนี้แทน onNext และ onCompleted เพื่อที่จะได้จัดการกับ Error ที่เกิดขึ้น

จากโค้ดตัวอย่างเมื่อครู่นี้ หลังจากที่ getNameList() ส่งค่ากลับมา ก็จะเข้า onNext พร้อมๆกับ onCompleted ทันที เพราะ Obserable ทำงานเสร็จแล้ว ส่วน onError ในตัวอย่างนี้จะยังไม่ได้ใช้งานนะ

ดังนั้นผลลัพธ์ที่ได้จะแสดงให้เห็นใน Logcat แบบนี้

D/Rx: onNext D/Rx: onCompleted


และเมื่อดูโค้ดดีๆก็จะเห็นว่าโค้ดมีความสัมพันธ์กันที่ตำแหน่งเหล่านี้

Observable สามารถใช้ได้กับทุกคลาส เวลาโยนคลาสอะไรเข้าไป คลาสนั้นก็จะถูกส่งกลับมาใน Subscriber นั่นหมายความว่าผู้ที่หลงเข้ามาอ่านสามารถเรียกใช้คำสั่งอะไรก็ได้ใน Observable แล้วเดี๋ยวมันจะส่งผลลัพธ์ที่ได้ออกมาใน Subscriber นั่นเอง

คำสั่ง just ก็เป็นหนึ่งในคำสั่งที่ใช้สร้าง Observable ซึ่งยังมีอีกหลายๆคำสั่งที่ใช้ในการสร้าง Observable เช่นกัน แต่เดี๋ยวไว้พูดถึงทีหลังนู้นนนนดีกว่า

และคำสั่ง just ยังสามารถโยนคลาสเข้าไปได้มากกว่าหนึ่งตัวด้วยนะ (แต่ต้องเป็นคลาสตัวเดียวกัน) เช่น

public String getNameList(int index) throws IndexOutOfBoundsException { List<String> nameList = Arrays.asList("Cupcake", "Donut", "Eclair", "Froyo", "Gingerbread", "Honeycomb", "Ice Cream Sandwich", "Jelly Bean", "Kitkat", "Lollipop", "Marshmallow", "Nugat"); return nameList.get(index); } public void doSomething() { Observable.just(getNameList(0), getNameList(2), getNameList(4)) .subscribe(new Observer<String>() { @Override public void onCompleted() { Log.d("Rx", "onCompleted"); } @Override public void onError(Throwable e) { Log.d("Rx", "onError"); } @Override public void onNext(String name) { Log.d("Rx", "Name: " + name); } }); }


เจ้าของบล็อกแก้ไขคำสั่ง getNameList ใหม่ ให้สามารถโยน Index เข้าไปได้ เพื่อดึงเฉพาะ String บางตัวออกมาแสดงใน Logcat

เมื่อดูผลลัพธ์ของการทำงานก็จะได้แบบนี้

D/Rx: Name: Cupcake D/Rx: Name: Eclair D/Rx: Name: Gingerbread D/Rx: onCompleted


จะเห็นว่าคำสั่งแต่ละครั้งจะโยนผลลัพธ์มาที่ onNext และเมื่อทำครบทุกคำสั่งแล้วก็จะเข้า onCompleted ทันที

แล้วถ้าเจ้าของบล็อกลองเรียกคำสั่ง getNameList แบบนี้ล่ะ? ใส่ Index เป็น 100 แม่มเล้ยยยย

Observable.just(getNameList(100)) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<String>() { @Override public void onCompleted() { Log.d("Rx", "onCompleted"); } @Override public void onError(Throwable e) { Log.d("Rx", "onError"); } @Override public void onNext(String name) { Log.d("Rx", "Name: " + name); } });


มันต้องเข้า onError แน่ๆ มันต้องเข้า onError แน่ๆ มันต้องเข้า onError แน่ๆ

//กดปุ่ม Run แล้วลองทดสอบ

ตู้ม!!

อ้าว onError ไม่ได้มีไว้จัดการกับ Exception หรอกเรอะ!!!

onError มีไว้จัดการกับโค้ดเวลาที่เกิด Exception ก็จริง แต่มันขึ้นอยู่กับวิธีที่สร้าง Observable ขึ้นมาต่างหากล่ะ (อย่างน้อย just ก็ไม่ใช่วิธีสร้าง Observable ที่จะทำให้ onError ทำงานหรอกนะ)

ซึ่ง just เนี่ยเป็นหนึ่งในการทำงานรูปแบบง่ายๆของ Rx ครับ แต่ในการใช้งานจริงๆจะไม่ค่อยได้ใช้กันหรอก ซึ่งเจ้าของบล็อกก็เอามาอธิบายให้เห็นภาพความสัมพันธ์ระหว่าง Observable กับ Subscriber เฉยๆครับ (บทความที่อื่นก็เช่นกัน)

แล้วจะทำยังไงดีให้ onError ทำงานได้ด้วย?


เจ้าของบล็อกก็เลยเปลี่ยนวิธีสร้าง Observable จาก just ให้เป็น fromCallable แทน จากเดิมที่ just จะเรียกคำสั่งนั้นโดยตรงก็เปลี่ยนไปใช้ fromCallbable ที่จะไปเรียกใช้งานคลาสที่ชื่อว่า Callable แทน (เป็นคลาสหนึ่งใน Java) แล้วใน Callable ตัวนั้นก็ให้เรียกคำสั่งที่ต้องการอีกทีหนึ่ง

Observable.fromCallable(new Callable<String>() { @Override public String call() throws Exception { return getNameList(100); } }) .subscribe(new Observer<String>() { @Override public void onCompleted() { Log.d("Rx", "onCompleted"); } @Override public void onError(Throwable e) { Log.d("Rx", "onError"); } @Override public void onNext(String name) { Log.d("Rx", "Name: " + name); } });


จะเห็นว่าใน Method ที่ชื่อว่า call จะมีการ Throws Exception ไว้อยู่ด้วย ซึ่งจะทำให้ Exception ใดๆที่เกิดขึ้นในคำสั่ง getNameList ไปเข้าที่ onError ทันที

ดังนั้นผลลัพธ์ที่เกิดขึ้นใน Logcat จะเป็นแบบนี้

D/Rx: onError


อย่าลืมนะ ว่าเมื่อเกิด onError ขึ้น นอกจาก onNext จะไม่ทำงานแล้ว onCompleted ก็จะไม่ทำงานเช่นกัน

มามะ มาเรียกใช้งานแบบจริงๆจังๆกัน (ซะที)


เพื่อให้เห็นภาพว่า Rx จะมาเปลี่ยนชีวิตในการเขียนโค้ดแอนดรอยด์ให้ดีขึ้นได้ยังไง ลองนึกภาพโค้ดบางอย่างที่ Block UI Thread ก่อน ยกตัวอย่างเช่น

public String getUserId() throws InterruptedException { Thread.sleep(5000); return "1234"; }


เป็นโค้ดที่น่าเกลียดชะมัด… แต่ช่างมัน มองข้ามมันไปเถอะ ฮาๆ เพราะอย่างน้อยคำสั่งนี้มันก็ยกตัวอย่างคำสั่งที่ Block UI Thread ได้อยู่เนอะ

จากตัวอย่างเจ้าของบล็อกได้สร้างคำสั่งที่ชื่อว่า getUserId ขึ้นมา เมื่อเรียกใช้คำสั่งนี้จะต้องรอ 5 วินาทีก่อนที่คำสั่งนี้จะโยนค่า String ออกมา

ถ้าเอาคำสั่งนี้ไปเรียกใช้งานในแอนดรอยด์โดยตรงเลยทุกคนคงรู้กันอยู่แล้วว่าแอปฯจะค้าง 5 วินาทีแน่นอน เพราะมันจะไปทำงานอยู่บน UI Thread ของแอปฯ

เพื่อแก้ปัญหาดังกล่าวก็จะใช้ AsycnTask กัน เพื่อให้คำสั่งนี้ทำงานใน Background Thread ซึ่ง AsyncTask เป็นสิ่งหนึ่งในแอนดรอยด์ที่เจ้าของบล็อกไม่ค่อยแฮปปี้ซักเท่าไร เนื่องจากรูปโค้ดไม่สวยงามและทำให้โค้ดดูซับซ้อนยิ่งขึ้น

ลาก่อน AsyncTask, สวัสดี Rx


อย่างที่บอกในตอนแรกๆนู้นว่า Rx มีความยืดหยุ่นในการใช้งานสูงมาก จึงมีคำสั่งให้กำหนดได้ว่าจะคำสั่งทำงานใน Thread ไหน โดยกำหนดด้วยคำสั่ง subscribeOn

Observable.fromCallable(new Callable<String>() { @Override public String call() throws Exception { return getUserId(); } }) .subscribeOn(Schedulers.io()) .subscribe(new Observer<String>() { @Override public void onCompleted() { Log.d("Rx", "onCompleted"); } @Override public void onError(Throwable e) { Log.d("Rx", "onError"); } @Override public void onNext(String name) { Log.d("Rx", "Name: " + name); } });


subscribeOn(Schedules.io()) จะไปบอกให้ Observable เรียกคำสั่ง getUserId บน Thread ที่อยู่ในรูปแบบของ Thread Pool แทน จึงแก้ปัญหา Block UI Thread ได้อย่างง่ายดาย

ลาก่อนโค้ด AsyncTask แบบเดิมๆที่เคยเขียนกัน…..

ทีนี้อาจจะดูเหมือนว่าโค้ดมันเสร็จสมบูรณ์แล้ว เพราะเจ้าของบล็อกแสดงผลลัพธ์แค่บน Logcat เท่านั้น แต่จริงๆยังไม่สมบูรณ์หรอก ลองเพิ่มโค้ดเข้าไปให้ไปแสดงผลบน Text View ด้วยสิ

TextView tvUserId = ... ... Observable.fromCallable(new Callable<String>() { @Override public String call() throws Exception { return getUserId(); } }) .subscribeOn(Schedulers.io()) .subscribe(new Observer<String>() { @Override public void onCompleted() { Log.d("Rx", "onCompleted"); } @Override public void onError(Throwable e) { Log.d("Rx", "onError"); tvUserId.setText("-"); } @Override public void onNext(String id) { Log.d("Rx", "onNext"); tvUserId.setText(id); } });


ก็ไม่เห็นมีอะไรนี่ แค่เอาค่าที่ได้ไปแสดงบน Text View แต่ถ้ามี Error อะไรเกิดขึ้นก็แค่ให้ Text View แสดงเป็นเครื่องหมาย — แทน

// กด Run แล้วดูผลการทำงาน



อ่าว ทำไม Text View ไม่แสดงข้อความอะไรเลยล่ะ… แถม Logcat ก็แสดงการทำงานแปลกๆด้วย

D/Rx: onNext D/Rx: onError


onNext มันทำงานก็แปลว่าคำสั่ง getUserId ทำงานเสร็จแล้วนี่ แล้วทำไมเกิด onError ต่อท้ายได้ล่ะนั่น

มาถึงตรงนี้ผู้ที่หลงเข้ามาอ่านอาจจะงงกับสิ่งที่เกิดขึ้น แต่เพื่อให้วิเคราะห์หาสาเหตุต่อไปได้ ให้ลองเช็คก่อนว่า Exception ที่ onError ส่งมาให้มันคือ Exception แบบไหน

@Override public void onError(Throwable e) { Log.d("Rx", "onError : " + e.getMessage()); tvUserId.setText("-"); }


จากนั้นก็ลองดูผลที่เกิดขึ้นใหม่อีกครั้ง

D/Rx: onNext D/Rx: onError : Only the original thread that created a view hierarchy can touch its views.


อ๋อออออออออออ เพราะไปกำหนดให้ Observable ทำงานใน Background Thread นี่เอง ดังนั้นผลลัพธ์ที่ได้เวลาส่งเข้ามาใน onNext, onCompleted หรือ onError ก็ยังคงอยู่บน Background Thread อยู่นี่เองงงงงงงงง

คำสั่งของ View ทุกตัวไม่สามารถเรียกใช้งานใน Background Thread ได้ ต้องเรียกผ่าน UI Thread (Main Thread) เท่านั้น

ทำให้ผู้ที่หลงเข้ามาอ่านส่วนใหญ่ต้องแก้ปัญหาด้วยการเขียนโค้ดเพิ่มแบบนี้

TextView tvUserId = ... ... Observable.fromCallable(new Callable<String>() { @Override public String call() throws Exception { return getUserId(); } }) .subscribeOn(Schedulers.io()) .subscribe(new Observer<String>() { @Override public void onCompleted() { Log.d("Rx", "onCompleted"); } @Override public void onError(Throwable e) { Log.d("Rx", "onError : " + e.getMessage()); setUserIdText("-"); } @Override public void onNext(String id) { Log.d("Rx", "onNext"); setUserIdText(id); } }); ... public void setUserIdText(final String id) { runOnUiThread(new Runnable() { @Override public void run() { tvUserId.setText(id); } }); }


รู้สึกถึงความน่าเกลียดของโค้ดมั้ยครับ ต้องมานั่งเรียก UI Thread ด้วยคำสั่ง runOnUiThread (คำสั่งนี้เรียกได้เฉพาะใน Activity เท่านั้น) ทุกครั้งก่อน ถึงจะกำหนดค่าให้กับ Text View ได้

นี่คือสิ่งหนึ่งที่เจ้าของบล็อกโคตรไม่แฮปปี้เวลาเขียนโค้ดแอนดรอยด์เลย…

ในที่สุดก็ถึงเวลาของ RxAndroid แล้ววววววว


เนื่องจากปัญหาที่เกิดขึ้นเป็นปัญหาเฉพาะตัวของแอนดรอยด์ ดังนั้น RxJava จึงไม่สามารถตอบโจทย์ได้หมด เมื่อต้องมาใช้งานบนแอนดรอยด์ จึงเป็นเหตุผลว่าทำไมต้องมี RxAndroid เพิ่มเข้ามาอีก

เอาล่ะ กลับมาที่ปัญหาเดิมกันต่อ เจ้าของบล็อกจะจัดการเรื่อง UI Thread ยังไงดีล่ะ?

ถ้าไปนั่งไล่ดูคำสั่งของ Rx ก็จะพบว่า นอกจากจะกำหนด Thread ที่ต้องการให้กับ Observable ได้แล้ว ยังกำหนด Thread ให้กับ Subscriber ได้อีกด้วยนะ

และที่สำคัญ Thread ของ Observable และ Subscriber ไม่จำเป็นต้องเป็นตัวเดียวกันด้วย!!

สำหรับการกำหนด Thread ให้กับ Subscriber จะใช้คำสั่ง observeOn

TextView tvUserId = ... ... Observable.fromCallable(new Callable<String>() { @Override public String call() throws Exception { return getUserId(); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<String>() { @Override public void onCompleted() { Log.d("Rx", "onCompleted"); } @Override public void onError(Throwable e) { Log.d("Rx", "onError"); tvUserId.setText("-"); } @Override public void onNext(String id) { Log.d("Rx", "onNext"); tvUserId.setText(id); } });


observeOn(AndroidSchedulers.mainThread()) จะกำหนดให้ Subscriber ส่งผลลัพธ์เข้ามาใน onNext, onCompleted และ onError โดยทำงานอยู่บน UI Thread (Main Thread) ทั้งหมด

เฮ้ย ง่ายเกินไปแล้ววว!

โดยคำสั่ง observeOn นั้นเป็นคำสั่งของ RxJava นะ แต่ AndroidSchedulers.mainThread() เป็นคำสั่งของ RxAndroid ซึ่งเป็นที่มาว่าทำไมบนแอนดรอยด์จะใช้ทั้ง RxJava และ RxAndroid ควบคู่กัน

ระวังสับสนคำต่างๆใน Rx


สิ่งหนึ่งที่ผู้ที่หลงเข้ามาอ่านที่พึ่งรู้จักกับ Rx รู้สึกว่าเข้าใจยากนั้นไม่ใช่ Concept ในการทำงานของ Rx หรอก แต่ดันเป็นคำที่ใช้เรียกสิ่งต่างๆใน Rx มากกว่าที่ทำให้สับสน เพราะมีทั้ง Observable, Observer, Observe, Subscriber และ Subscribe

Observable และ Subscriber เป็นเหมือนวัตถุที่จะต้องมีควบคู่กันใน Concept ของ Rx

ส่วน Subscriber และ Observer คือตัวเดียวกันครับ ใน Concept เค้าจะเรียกว่า Subscriber แต่ในโค้ดจะใช้คลาสที่ชื่อว่า Observer

Subscribe คือการกำหนด Subscriber ให้ไปคู่กับ Observable ตัวใดๆก็ตาม

Observe คือช่วงเวลาที่คำสั่งที่กำหนดไว้ใน Observable กำลังทำงาน

ถ้าอ่านแล้วเข้าใจก็ดีใจด้วยครับ แต่ถ้ายังไม่เข้าใจก็ขอโทษด้วยครับ ไม่รู้จะอธิบายยังไงให้เข้าใจง่ายกว่านี้แล้ว ฮาๆ

สรุป


ในตอนที่ 2 นี้เจ้าของบล็อกก็จะเน้นไปที่การอธิบายว่า Observable กับ Subscriber ทั้งต้องทำงานคู่กันนะ ขาดกันไม่ได้ Observable เอาไว้กำหนดค่าหรือคลาสที่ต้องการโยนเข้าไปให้ Subscriber (จะโยนเป็นค่าลงไปตรงๆหรือคำสั่งใดๆก็ทำได้หมด) ส่วนผลลัพธ์ที่เกิดขึ้นใน Observable จะถูกส่งออกมายัง Subscriber ในรูปของ onNext, onCompleted และ onError ซึ่งทั้ง 3 ตัวนี้จะมีหน้าที่แตกต่างกันไป

โดยการสร้าง Observable ในบทความนี้จะยังมีแค่ just กับ fromCallable อยู่ ตอนถัดไปก็คงจะมีผุดขึ้นมาทีละตัวสองตัว

นอกจากความสามารถในการส่งข้อมูลระหว่าง Observable กับ Subscriber แล้ว ทั้ง 2 ตัวยังสามารถกำหนด Thread ที่จะให้ทำงานได้อีกด้วย โดยไม่จำเป็นต้องเป็น Thread เดียวกัน และนักพัฒนาก็ไม่ต้องลงไปจัดการเองด้วย เพราะ Rx จะจัดการให้หมดเลย

และเพื่อไม่ให้สับสนระหว่าง Observable, Observer, Observe, Subscriber และ Subscribe เจ้าของบล็อกก็จะพยายามเลี่ยงคำบางคำออกไปเพื่อให้เหลือแค่ Observable กับ Subscriber เท่านั้นนะครับ

เริ่มน่าสนใจมากขึ้นแล้วใช่มั้ยล่ะ?

นี่ยังเป็นแค่ความสามารถส่วนหนึ่งของ Rx นะ ยังมีความสามารถที่จะช่วยให้นักพัฒนาสะดวกสบายอีกมากมายเลยล่ะ เพราะงั้นก็รอทยอยอ่านใน บทความต่อไป ได้เลยจ้า