跳转到主要内容

标签(标签)

资源精选(342) Go开发(108) Go语言(103) Go(99) angular(82) LLM(75) 大语言模型(63) 人工智能(53) 前端开发(50) LangChain(43) golang(43) 机器学习(39) Go工程师(38) Go程序员(38) Go开发者(36) React(33) Go基础(29) Python(24) Vue(22) Web开发(20) Web技术(19) 精选资源(19) 深度学习(19) Java(18) ChatGTP(17) Cookie(16) android(16) 前端框架(13) JavaScript(13) Next.js(12) 安卓(11) typescript(10) 资料精选(10) NLP(10) 第三方Cookie(9) Redwoodjs(9) LLMOps(9) Go语言中级开发(9) 自然语言处理(9) 聊天机器人(9) PostgreSQL(9) 区块链(9) mlops(9) 安全(9) 全栈开发(8) ChatGPT(8) OpenAI(8) Linux(8) AI(8) GraphQL(8) iOS(8) 软件架构(7) Go语言高级开发(7) AWS(7) C++(7) 数据科学(7) whisper(6) Prisma(6) 隐私保护(6) RAG(6) JSON(6) DevOps(6) 数据可视化(6) wasm(6) 计算机视觉(6) 算法(6) Rust(6) 微服务(6) 隐私沙盒(5) FedCM(5) 语音识别(5) Angular开发(5) 快速应用开发(5) 提示工程(5) Agent(5) LLaMA(5) 低代码开发(5) Go测试(5) gorm(5) REST API(5) 推荐系统(5) WebAssembly(5) GameDev(5) CMS(5) CSS(5) machine-learning(5) 机器人(5) 游戏开发(5) Blockchain(5) Web安全(5) Kotlin(5) 低代码平台(5) 机器学习资源(5) Go资源(5) Nodejs(5) PHP(5) Swift(5) 智能体(4) devin(4) Blitz(4) javascript框架(4) Redwood(4) GDPR(4) 生成式人工智能(4) Angular16(4) Alpaca(4) SAML(4) JWT(4) JSON处理(4) Go并发(4) kafka(4) 移动开发(4) 移动应用(4) security(4) 隐私(4) spring-boot(4) 物联网(4) nextjs(4) 网络安全(4) API(4) Ruby(4) 信息安全(4) flutter(4) 专家智能体(3) Chrome(3) CHIPS(3) 3PC(3) SSE(3) 人工智能软件工程师(3) LLM Agent(3) Remix(3) Ubuntu(3) GPT4All(3) 软件开发(3) 问答系统(3) 开发工具(3) 最佳实践(3) RxJS(3) SSR(3) Node.js(3) Dolly(3) 移动应用开发(3) 编程语言(3) 低代码(3) IAM(3) Web框架(3) CORS(3) 基准测试(3) Go语言数据库开发(3) Oauth2(3) 并发(3) 主题(3) Theme(3) earth(3) nginx(3) 软件工程(3) azure(3) keycloak(3) 生产力工具(3) gpt3(3) 工作流(3) C(3) jupyter(3) 认证(3) prometheus(3) GAN(3) Spring(3) 逆向工程(3) 应用安全(3) Docker(3) Django(3) R(3) .NET(3) 大数据(3) Hacking(3) 渗透测试(3) C++资源(3) Mac(3) 微信小程序(3) Python资源(3) JHipster(3) 大型语言模型(2) 语言模型(2) 可穿戴设备(2) JDK(2) SQL(2) Apache(2) Hashicorp Vault(2) Spring Cloud Vault(2) Go语言Web开发(2) Go测试工程师(2) WebSocket(2) 容器化(2) AES(2) 加密(2) 输入验证(2) ORM(2) Fiber(2) Postgres(2) Gorilla Mux(2) Go数据库开发(2) 模块(2) 泛型(2) 指针(2) HTTP(2) PostgreSQL开发(2) Vault(2) K8s(2) Spring boot(2) R语言(2) 深度学习资源(2) 半监督学习(2) semi-supervised-learning(2) architecture(2) 普罗米修斯(2) 嵌入模型(2) productivity(2) 编码(2) Qt(2) 前端(2) Rust语言(2) NeRF(2) 神经辐射场(2) 元宇宙(2) CPP(2) 数据分析(2) spark(2) 流处理(2) Ionic(2) 人体姿势估计(2) human-pose-estimation(2) 视频处理(2) deep-learning(2) kotlin语言(2) kotlin开发(2) burp(2) Chatbot(2) npm(2) quantum(2) OCR(2) 游戏(2) game(2) 内容管理系统(2) MySQL(2) python-books(2) pentest(2) opengl(2) IDE(2) 漏洞赏金(2) Web(2) 知识图谱(2) PyTorch(2) 数据库(2) reverse-engineering(2) 数据工程(2) swift开发(2) rest(2) robotics(2) ios-animation(2) 知识蒸馏(2) 安卓开发(2) nestjs(2) solidity(2) 爬虫(2) 面试(2) 容器(2) C++精选(2) 人工智能资源(2) Machine Learning(2) 备忘单(2) 编程书籍(2) angular资源(2) 速查表(2) cheatsheets(2) SecOps(2) mlops资源(2) R资源(2) DDD(2) 架构设计模式(2) 量化(2) Hacking资源(2) 强化学习(2) flask(2) 设计(2) 性能(2) Sysadmin(2) 系统管理员(2) Java资源(2) 机器学习精选(2) android资源(2) android-UI(2) Mac资源(2) iOS资源(2) Vue资源(2) flutter资源(2) JavaScript精选(2) JavaScript资源(2) Rust开发(2) deeplearning(2) RAD(2)

Let’s start from the basics and gradually progress towards more advanced concepts in RxJS in Angular

Observables and Observers:

Observables are the foundation of RxJS, representing a stream of data that can be observed over time. Observers are the consumers of these streams, listening for emitted data and reacting accordingly.

Observables and Observers are similar to the concepts of events and event handlers in C#.

Observables:

In Angular, you can create an Observable using the Observable class from the rxjs library. An Observable can emit data asynchronously over time, and can be subscribed by Observers to receive the emitted data.

Example in Angular:

import { Observable } from 'rxjs';

// Create an Observable that emits a stream of numbers
const numbers$ = new Observable(observer => {
  let count = 1;
  const intervalId = setInterval(() => {
    observer.next(count++);
  }, 1000);

  // Cleanup logic when the Observable is unsubscribed
  return () => {
    clearInterval(intervalId);
  };
});

// Subscribe to the Observable to receive the emitted numbers
numbers$.subscribe(value => console.log(value));

Observers:

In Angular, an Observer is an object that defines how to handle the emitted data from an Observable.

It has three optional methods:

next for handling the emitted data

error for handling any errors

complete for handling the completion of the Observable.

Example in Angular:

import { Observable } from 'rxjs';

// Create an Observable that emits a stream of numbers
const numbers$ = new Observable(observer => {
  let count = 1;
  const intervalId = setInterval(() => {
    observer.next(count++);
  }, 1000);

  // Cleanup logic when the Observable is unsubscribed
  return () => {
    clearInterval(intervalId);
  };
});

// Define an Observer to handle the emitted numbers
const observer = {
  next: value => console.log(value),
  error: err => console.error(err),
  complete: () => console.log('Observable completed')
};

// Subscribe the Observer to the Observable
numbers$.subscribe(observer);
  1. Operators: Operators are functions that allow you to transform, filter, and combine data streams emitted by Observables.They are similar to LINQ operators in C#.
  • Transformation Operators: Transformation operators allow you to transform the data emitted by an Observable into a different format or structure.
  • Example in Angular with map operator:
import { of } from 'rxjs';
import { map } from 'rxjs/operators';

// Create an observable that emits a stream of numbers
const numbers$ = of(1, 2, 3, 4, 5);

// Use map operator to square each number in the stream
const squaredNumbers$ = numbers$.pipe(
  map(num => num * num)
);

// Subscribe to the transformed data stream
squaredNumbers$.subscribe(value => console.log(value));
  • Filtering Operators: Filtering operators allow you to filter the data emitted by an Observable based on a given condition.

Example in Angular with filter operator:

import { of } from 'rxjs';
import { filter } from 'rxjs/operators';

// Create an observable that emits a stream of numbers
const numbers$ = of(1, 2, 3, 4, 5);

// Use filter operator to get only even numbers from the stream
const evenNumbers$ = numbers$.pipe(
  filter(num => num % 2 === 0)
);

// Subscribe to the filtered even numbers stream
evenNumbers$.subscribe(value => console.log(value));

Subjects:

Subjects are both Observables and Observers, which allows you to emit and subscribe to data streams directly.

Subjects are similar to the concept of event emitters in C#.

Example in Angular with Subject:

import { Subject } from 'rxjs';

// Create a Subject to emit and subscribe to a stream of numbers
const numbersSubject$ = new Subject<number>();

// Subscribe to the Subject to receive emitted numbers
numbersSubject$.subscribe(value => console.log(value));

// Emit numbers to the Subject
numbersSubject$.next(1);
numbersSubject$.next(2);
numbersSubject$.next(3);

Hot and Cold Observables:

Observables can be categorized into two types

  1. Hot
  2. Cold.

Hot Observables emit data regardless of whether there are any subscribers Cold Observables only emit data when there are active subscribers.

Example in Angular with Hot and Cold Observables:

import { interval, fromEvent } from 'rxjs';

// Hot Observable - emits data regardless of subscribers
const hotObservable$ = interval(1000);

// Cold Observable - emits data only when subscribed
const button = document.querySelector('button');
const coldObservable$ = fromEvent(button, 'click');

// Subscribe to both Observables
hotObservable$.subscribe(value => console.log(`Hot: ${value}`));
coldObservable$.subscribe(value => console.log(`Cold: ${value}`));

Error handling:

RxJS provides operators for handling errors in Observables, such as catchError and retry, which allow you to handle errors and retries in a stream of data.

Example in Angular with error handling:

import { of } from 'rxjs';
import { catchError, retry } from 'rxjs/operators';

// Create an observable that may emit an error
const numbers$ = of(1, 2, 3, 4, 5, 'six');

// Use catchError operator to handle errors
const numbersWithErrorHandled$ = numbers$.pipe(
  catchError(err => of('Error occurred:', err))
);

// Use retry operator to retry the observable in case of error
const numbersWithRetry$ = numbers$.pipe(
  retry(2) // Retry 2 times in case of error
);

// Subscribe to the error handled and retried observables
numbersWithErrorHandled$.subscribe(value => console.log(value));
numbersWithRetry$.subscribe(value => console.log(value));

Custom operators:

RxJS allows you to create your own custom operators by composing existing operators or extending the Observable class. This gives you flexibility to create reusable and specialized operators for your specific use cases.

Example in Angular with a custom operator:

import { Observable, OperatorFunction } from 'rxjs';
import { map } from 'rxjs/operators';

// Custom operator to multiply each emitted number by a given factor
function multiplyBy(factor: number): OperatorFunction<number, number> {
  return (source: Observable<number>) =>
    source.pipe(map(num => num * factor));
}

// Create an observable that emits a stream of numbers
const numbers$ = of(1, 2, 3, 4, 5);

// Use the custom multiplyBy operator to multiply each number by 10
const multipliedNumbers$ = numbers$.pipe(multiplyBy(10));

// Subscribe to the multiplied numbers
multipliedNumbers$.subscribe(value => console.log(value));

Schedulers:

RxJS allows you to control the execution context or scheduler of an Observable. Schedulers provide options for managing concurrency, controlling timing, and executing code on specific threads or contexts.

Example in Angular with schedulers:

import { of, asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';

// Create an observable that emits a stream of numbers
const numbers$ = of(1, 2, 3, 4, 5);

// Use observeOn operator to specify an asyncScheduler for subscription
const asyncNumbers$ = numbers$.pipe(observeOn(asyncScheduler));

// Subscribe to the numbers with asyncScheduler
asyncNumbers$.subscribe(value => console.log(value));

// Use observeOn operator with other schedulers like asapScheduler or queueScheduler
const asapNumbers$ = numbers$.pipe(observeOn(asapScheduler));
const queueNumbers$ = numbers$.pipe(observeOn(queueScheduler));

Multicasting:

By default, Observables are unicast, meaning each subscription creates a separate execution of the Observable.

However, you can multicast Observables to share a single execution among multiple subscribers, which can improve performance and reduce duplicated work.

Example in Angular with multicasting:

import { interval, Subject } from 'rxjs';
import { multicast, refCount } from 'rxjs/operators';

// Create a hot Observable that emits numbers every second
const numbers$ = interval(1000).pipe(multicast(() => new Subject()), refCount());

// Subscribe to the hot Observable from multiple subscribers
numbers$.subscribe(value => console.log(`Subscriber 1: ${value}`));
numbers$.subscribe(value => console.log(`Subscriber 2: ${value}`));

// Start the execution of the hot Observable
numbers$.connect();

Customizing the Observable:

You can also customize the behavior of an Observable by extending the Observable class and implementing your own logic for emitting values, handling errors, and managing subscriptions.

Example in Angular with a custom Observable:

import { Observable } from 'rxjs';

// Custom Observable that emits a sequence of numbers
class MyNumbersObservable extends Observable<number> {
  private currentNumber = 1;

  constructor(private maxNumber: number) {
    super(subscriber => {
      const intervalId = setInterval(() => {
        if (this.currentNumber <= maxNumber) {
          subscriber.next(this.currentNumber++);
        } else {
          subscriber.complete();
          clearInterval(intervalId);
        }
      }, 1000);
    });
  }
}

// Create an instance of the custom Observable
const myNumbers$ = new MyNumbersObservable(5);

// Subscribe to the custom Observable
myNumbers$.subscribe(value => console.log(value));

Backpressure:

RxJS provides mechanisms for handling backpressure, which occurs when the rate of emission from an Observable is higher than the rate of consumption by subscribers. Backpressure strategies allow you to control how data is buffered, dropped, or managed when dealing with high-rate data streams.

Example in Angular with backpressure:

import { interval, bufferTime } from 'rxjs';

// Create a fast-emitting Observable that emits numbers every 100ms
const fastNumbers$ = interval(100);

// Use bufferTime operator to buffer emitted numbers for every 1 second
const bufferedNumbers$ = fastNumbers$.pipe(bufferTime(1000));

// Subscribe to the buffered numbers
bufferedNumbers$.subscribe(values => console.log(values));

Error Handling:

RxJS provides operators for handling errors that may occur in the Observable stream.

You can catch and handle errors, retry failed Observables, and take other actions to gracefully handle errors in your application.

Example in Angular with error handling:

import { of } from 'rxjs';
import { catchError, retry } from 'rxjs/operators';

// Create an Observable that may throw an error
const numbers$ = of(1, 2, 3, 4, 5, 'invalid', 7, 8, 9);

// Use catchError operator to catch and handle errors
const safeNumbers$ = numbers$.pipe(
  catchError(error => {
    console.error(`Error: ${error}`);
    return of('Error occurred. Continuing with default value.');
  })
);

// Use retry operator to retry failed Observables
const retryNumbers$ = safeNumbers$.pipe(
  retry(2) // Retry failed Observables up to 2 times
);

// Subscribe to the safe and retrying numbers
retryNumbers$.subscribe(value => console.log(value));

Custom Operators:

RxJS allows you to create custom operators by combining existing operators or by implementing your own logic for transforming or filtering values in the Observable stream. Custom operators can provide reusable and specialized functionality for your specific use cases.

Example in Angular with a custom operator:

import { Observable, OperatorFunction } from 'rxjs';
import { filter } from 'rxjs/operators';

// Custom operator that filters out odd numbers
function filterOutOddNumbers(): OperatorFunction<number, number> {
  return (source: Observable<number>) =>
    new Observable<number>(subscriber => {
      return source.subscribe(value => {
        if (value % 2 === 0) {
          subscriber.next(value);
        }
      });
    });
}

// Create an Observable that emits a sequence of numbers
const numbers$ = of(1, 2, 3, 4, 5);

// Use the custom filterOutOddNumbers operator
const filteredNumbers$ = numbers$.pipe(filterOutOddNumbers());

// Subscribe to the filtered numbers
filteredNumbers$.subscribe(value => console.log(value));

This is just a brief overview of some of the basic to advanced concepts and features of RxJS. RxJS is a powerful and flexible library that can greatly simplify and enhance your asynchronous programming in Angular or any other JavaScript environment. I recommend referring to the official RxJS documentation for more in-depth explanations and examples.

I hope this detailed blog post on Observables and subject in Rxjs with examples. Remember to apply these concepts wisely in your code and make use of the code examples provided to enhance your understanding.

Happy coding!

Please let me know your thoughts on this story by clapping or leaving a comment with suggestions for future topics.

标签