[React.js] Stream 데이터 처리하기 - 1

지난 포스팅에 Stream Data 관련해서 글을 작성하였다.

이번 포스팅에서는 해당 내용을 실제로 적용시켜보는 작업을 한다.

 

 

How To Decode?

그런데 stream 데이터를 가져온다고 바로 쓸 수 있는 게 아니라, 변환 및 화면에서 사용하는 방법에 대해 포스팅해보려고 한다.

//api.js
export const retrieveStreamingData = async ({ inputValue, selectedItems }) => {
    try {
        const requestBody = { data: 'some data...' }

        // API 요청
        const response = await fetch(`${Config.baseURL}/api/v-1/retrieveStreamingData`, {
            method: 'POST',
            headers: Config.headers,
            body: JSON.stringify(requestBody),
        })

        useRetrieveStore.getState().setText('') //초기화
        useRetrieveStore.getState().setTitle('') //초기화

        const reader = response.body.getReader()
        const decoder = new TextDecoder()

        while (true) {
            const { done, value } = await reader.read()

            const decodedChunk = decoder.decode(value, { stream: true })
            if (decodedChunk) {
                try {
                    const beforeData = removeLeadingData(decodeUnicodeString(decodedChunk))
                    const afterData = remmoveBackslash(
                        removeFirstAndLastQuotes(decodeUnicodeString(beforeData)),
                    )
                    const cleanData = parseNestedJSON(afterData)

                    if (
                        cleanData?.msg === 'process_generating' &&
                        cleanData?.output?.data[0][0] &&
                        cleanData?.output?.data[0][0].length >= 1
                    ) {
                        if (cleanData?.output?.data[0][0][0] === 'append')
                            //store에 저장
                            useRetrieveStore
                                .getState()
                                .appendText(
                                    cleanData.output.data[0][0][2].replace(/\\n/g, '\n'),
                                )
                    } else if (cleanData?.result || cleanData?.thread_id) {
                        // Streaming 종료, 
                        // title이 마지막에 나오기 때문에 마지막에 title 값을 할당
                        useRetrieveStore
                            .getState()
                            .setTitle(cleanData.result?.created_title ?? '')
                    }
                } catch (error) {
                    console.error('Error:', error)
                }
            }

            //done이 true이면 루프 종료
            if (done) break
        }
    } catch (error) {
        console.error('🚨 [API 요청 중 오류 발생]: ', error.message)
    }
}
//utils.js
// 백슬래시를 제거하되 줄바꿈은 유지
const remmoveBackslash = (str) => str.replace(/\\(?![n\r])/g, '').replace(/""/g, '"')

// 줄바꿈 관련 처리 제거
const removeFirstAndLastQuotes = (str) =>
    str
        .replace(/"{/g, '{')
        .replace(/}"/g, '}')
        .replace(/\\/g, '\\\\')
        .replace(/"/g, '\\"')
        .replace(/\t/g, '\\t')

//유니코드 문자열 디코딩
const decodeUnicodeString = (str) =>
    str.replace(/u([0-9a-fA-F]{4})/g, (_, hex) => String.fromCharCode(parseInt(hex, 16)))

//중첩 JSON 파싱
const parseNestedJSON = (jsonString) => {
    let result = jsonString
    while (typeof result === 'string') {
        try {
            result = JSON.parse(result)
        } catch (e) {
            break
        }
    }
    if (result?.content && typeof result?.content === 'string') {
        result.content = JSON.parse(result.content)
    }
    return result
}

//"data:" 제거
const removeLeadingData = (str) => {
    if (str.startsWith(',data:')) {
        return str.slice(str.indexOf(':') + 1).trim()
    }
    if (str.startsWith('data:')) {
        return str.slice(str.indexOf(':') + 1).trim()
    }
    return str
}

조금 길지만 이런 식으로 사용하였다.

1. api.js

  1. /api/v-1/retrieveStreamingData 로 api 요청.
  2. Stream 데이터로 연속적으로 데이터를 내려주게 됨.
  3. 내려온 데이터들을 지속적으로 store에 저장.
    (while 문을 사용하여 단어로 끊어져서 내려오는 데이터들을 useRetrieveStore  set)
  4. React.js 4번 화면에서 사용 하도록 한다

 

2. utils.js - 데이터 전처리

  • 데이터의 포맷이 정상적으로 내려오지 않아서 1.5일 동안 개고생했다.
    이 부분인데,
  1. removeLeadingData(String) : 데이터의 가장 앞에 "data: " 로 내려오는 경우가 있어 해당 로직을 추가하였다. (",data:" 도 있었다..)
  2. decodeUnicodeString(String) : Unicode 로 된 데이터를 변환하는 작업.
  3. removeFirstAndLastQuotes(String) : Escape 문자열을 처리하는 방법. (진짜 할 때 속터져)
str
.replace(/"{/g, '{')    // 문자열 시작의 '"{' 를 '{' 로 변경 (JSON 객체 시작 부분 정리)
.replace(/}"/g, '}')    // 문자열 끝의 '}"' 를 '}' 로 변경 (JSON 객체 끝 부분 정리)
.replace(/\\/g, '\\\\') // 단일 백슬래시를 이중 백슬래시로 변경 (이스케이프 처리)
.replace(/"/g, '\\"')   // 따옴표를 이스케이프 처리된 따옴표로 변경
.replace(/\t/g, '\\t')  // 탭 문자를 이스케이프 처리된 탭으로 변경

4. parseNestedJSON(String|Object) : 결국 받아온 값들을 사용하려면 JSON.parse(String) 이 필요한데, 재귀함수처럼 사용하였다 (사실은 while 반복문)
한 번 파싱했을 때 한개의 요소만 JSON 파싱이 되는 경우가 있어서 만약 String 형태면 다시 JSON.parse() 해주는 작업을 추가하였다.

 

3. Store - zustand

Store는 React.js 에서 유명한 zustand를 사용하였다.

import { create } from 'zustand'

const useRetrieveStore = create((set) => ({
    retrievedText: '', // 텍스트
    retrievedTitle: '', // 제목

    setRetrievedText: (newText) =>
        set(() => ({
            retrievedText: newText,
        })),

    // title setter
    setRetrievedTitle: (newTitle) =>
        set(() => ({
            retrievedTitle: newTitle,
        })),

    // text appender (setter 변형) - 텍스트를 추가한다.
    appendText: (additionalContent) =>
        set((state) => ({
            retrievedText: state.retrievedText + additionalContent,
        })),
}))

export default useRetrieveStore

4. React.js 화면

Chatting 스타일로 만들었기 때문에 Array로 데이터 구조를 만들었다.

    const retrievedText = useRetrieveStore(
        (state) => state.retrievedText,
    )
    const retrievedTitle = useRetrieveStore(
        (state) => state.retrievedTitle,
    )

    // AI 메시지 추가 - store값을 가져와 실시간으로 반영하는것으로 처리.
    useEffect(() => {
        if (messageList.length === 0 || retrievedText === '') return

        const lastMessage = messageList[messageList.length - 1]

        if (lastMessage.type !== 'ai') {
            setMessageList((prev) => [...prev, { id: Date.now(), text: '', type: 'ai' }])
        } else if (lastMessage.type === 'ai') {
            setMessageList((prev) =>
                prev.map((message, index) =>
                    index === prev.length - 1
                        ? { ...message, text: retrievedText }
                        : message,
                ),
            )
        }
    }, [retrievedText])

    //title 수정 되면 기존의 값에 할당
    useEffect(() => {
        if (messageList.length === 0 || retrievedTitle === '') return
        
        const lastMessage = messageList[messageList.length - 1]

        if (lastMessage.type === 'ai') {
            setMessageList((prev) =>
                prev.map((message, index) =>
                    index === prev.length - 1
                        ? { ...message, title: retrievedTitle }
                        : message,
                ),
            )
        }
    }, [retrievedTitle])

각각 useEffect 를 사용하여 해당 값이 업데이트 되었을 때 리렌더링 되도록 유도하였다.

++ 약간 Vue.js의 watch 와 유사하다고 볼 수 있겠지만,
React.js 의 useEffect가 전반적인 LifeCycle에 관여할 수 있기 때문에 더 큰 범위라고 할 수 있다.

 

반응형
streaming

Stream 데이터란?

우리가 흔히 말하는 유튜버들의 스트리밍이라는 단어가 파생된 근본이다.
Stream Data 는 연속적으로 생성되고 전송되는 데이터이다.
그래서 실시간, 확장성, 대량으로 등 데이터를 다루기 편한 부분이다

뭔가 WebSocket이랑 비슷한 느낌?

React.js 에서 RestAPI로 써보기

const StreamingComponent = () => {
    const [messages, setMessages] = useState([]);

    useEffect(() => {
        const fetchData = async () => {
            const response = await fetch('http://localhost:5000/stream');
            const reader = response.body.getReader();
            const decoder = new TextDecoder();

            let receivedData = [];

            while (true) {
                const { done, value } = await reader.read();
                if (done) break;

                const chunk = decoder.decode(value, { stream: true });
                const lines = chunk.split('\n').filter(line => line.startsWith('data: '));

                for (const line of lines) {
                    const jsonData = JSON.parse(line.replace('data: ', ''));
                    receivedData.push(jsonData);
                    setMessages([...receivedData]); // UI 업데이트
                }
            }
        };

        fetchData();
    }, []);

    return (
        <div>
            <ul>
                {messages.map((msg, index) => (
                    <li key={index}>{index}: {msg.message}</li>
                ))}
            </ul>
        </div>
    );
};

await fetch(url) 하는 것은 일반 RestAPI 와 동일하다.

response.body.getReader() 로 해당 데이터를 가져올 수 있고,
decoder.decode(value, { stream: true }) 디코딩을 한 번 해야한다.

Done

그리고 마지막으로 데이터가 끝나면 done 을 호출하게 된다.
추가적으로 done 에 대한 로직을 구현할 수도 있다.

if (done) {
	alert('Streaming 이 종료되었습니다!')
}

Decoding 을 해야하는 이유

위의 소스를 보면, decoder.decode(value) 를 하고 있는데, 디코딩을 해야하는 이유는 다음과 같다.

  1. 데이터 압축 해제: 스트리밍 과정에서 데이터는 효율적인 전송을 위해 압축되고 인코딩됩니다. 수신 장치에서는 이 데이터를 원래의 형태로 복원하기 위해 디코딩이 필요합니다
  2. 호환성 확보: 인코딩된 데이터는 다양한 장치와 플랫폼에서 재생할 수 있도록 표준화된 형식으로 변환됩니다. 디코딩은 이 표준화된 형식을 각 장치에서 재생 가능한 형태로 변환하는 과정입니다
  3. 실시간 재생: 스트리밍 데이터는 작은 세그먼트로 나누어져 전송됩니다. 각 세그먼트를 수신할 때마다 디코딩하여 즉시 재생할 수 있게 합니다
  4. 품질 최적화: 디코딩 과정에서 네트워크 상태나 장치 성능에 따라 적절한 품질의 스트림을 선택하여 재생할 수 있습니다
  5. 다양한 형식 지원: 트랜스코딩(인코딩과 디코딩의 조합)을 통해 다양한 비디오 형식을 지원하고, 각 사용자의 환경에 맞는 최적의 형식으로 변환할 수 있습니다

WebSocket과 Streaming 의 차이점

왠지 비슷하다고 느껴졌는데, 그 이유가 여기 있었다.
단지 사용처가 조금 다를 뿐(텍스트+양방향, 동영상+단방향) 이었다.

 

반응형

+ Recent posts