@@ -209,3 +209,114 @@ def format_namespace(namespace):
209
209
210
210
# 필요에 따라 최종 결과 반환
211
211
return final_result
212
+
213
+
214
+ async def ainvoke_graph (
215
+ graph : CompiledStateGraph ,
216
+ inputs : dict ,
217
+ config : Optional [RunnableConfig ] = None ,
218
+ node_names : List [str ] = [],
219
+ callback : Optional [Callable ] = None ,
220
+ include_subgraphs : bool = True ,
221
+ ) -> Dict [str , Any ]:
222
+ """
223
+ LangGraph 앱의 실행 결과를 비동기적으로 스트리밍하여 출력하는 함수입니다.
224
+
225
+ Args:
226
+ graph (CompiledStateGraph): 실행할 컴파일된 LangGraph 객체
227
+ inputs (dict): 그래프에 전달할 입력값 딕셔너리
228
+ config (Optional[RunnableConfig]): 실행 설정 (선택적)
229
+ node_names (List[str], optional): 출력할 노드 이름 목록. 기본값은 빈 리스트
230
+ callback (Optional[Callable], optional): 각 청크 처리를 위한 콜백 함수. 기본값은 None
231
+ 콜백 함수는 {"node": str, "content": Any} 형태의 딕셔너리를 인자로 받습니다.
232
+ include_subgraphs (bool, optional): 서브그래프 포함 여부. 기본값은 True
233
+
234
+ Returns:
235
+ Dict[str, Any]: 최종 결과 (마지막 노드의 출력)
236
+ """
237
+ config = config or {}
238
+ final_result = {}
239
+
240
+ def format_namespace (namespace ):
241
+ return namespace [- 1 ].split (":" )[0 ] if len (namespace ) > 0 else "root graph"
242
+
243
+ # subgraphs 매개변수를 통해 서브그래프의 출력도 포함
244
+ async for chunk in graph .astream (
245
+ inputs , config , stream_mode = "updates" , subgraphs = include_subgraphs
246
+ ):
247
+ # 반환 형식에 따라 처리 방법 분기
248
+ if isinstance (chunk , tuple ) and len (chunk ) == 2 :
249
+ # 기존 예상 형식: (namespace, chunk_dict)
250
+ namespace , node_chunks = chunk
251
+ else :
252
+ # 단일 딕셔너리만 반환하는 경우 (REACT 에이전트 등)
253
+ namespace = [] # 빈 네임스페이스 (루트 그래프)
254
+ node_chunks = chunk # chunk 자체가 노드 청크 딕셔너리
255
+
256
+ # 딕셔너리인지 확인하고 항목 처리
257
+ if isinstance (node_chunks , dict ):
258
+ for node_name , node_chunk in node_chunks .items ():
259
+ final_result = {
260
+ "node" : node_name ,
261
+ "content" : node_chunk ,
262
+ "namespace" : namespace ,
263
+ }
264
+
265
+ # node_names가 비어있지 않은 경우에만 필터링
266
+ if node_names and node_name not in node_names :
267
+ continue
268
+
269
+ # 콜백 함수가 있는 경우 실행
270
+ if callback is not None :
271
+ result = callback ({"node" : node_name , "content" : node_chunk })
272
+ # 코루틴인 경우 await
273
+ if hasattr (result , "__await__" ):
274
+ await result
275
+ # 콜백이 없는 경우 기본 출력
276
+ else :
277
+ print ("\n " + "=" * 50 )
278
+ formatted_namespace = format_namespace (namespace )
279
+ if formatted_namespace == "root graph" :
280
+ print (f"🔄 Node: \033 [1;36m{ node_name } \033 [0m 🔄" )
281
+ else :
282
+ print (
283
+ f"🔄 Node: \033 [1;36m{ node_name } \033 [0m in [\033 [1;33m{ formatted_namespace } \033 [0m] 🔄"
284
+ )
285
+ print ("- " * 25 )
286
+
287
+ # 노드의 청크 데이터 출력
288
+ if isinstance (node_chunk , dict ):
289
+ for k , v in node_chunk .items ():
290
+ if isinstance (v , BaseMessage ):
291
+ v .pretty_print ()
292
+ elif isinstance (v , list ):
293
+ for list_item in v :
294
+ if isinstance (list_item , BaseMessage ):
295
+ list_item .pretty_print ()
296
+ else :
297
+ print (list_item )
298
+ elif isinstance (v , dict ):
299
+ for node_chunk_key , node_chunk_value in v .items ():
300
+ print (f"{ node_chunk_key } :\n { node_chunk_value } " )
301
+ else :
302
+ print (f"\033 [1;32m{ k } \033 [0m:\n { v } " )
303
+ elif node_chunk is not None :
304
+ if hasattr (node_chunk , "__iter__" ) and not isinstance (
305
+ node_chunk , str
306
+ ):
307
+ for item in node_chunk :
308
+ print (item )
309
+ else :
310
+ print (node_chunk )
311
+ print ("=" * 50 )
312
+ else :
313
+ # 딕셔너리가 아닌 경우 전체 청크 출력
314
+ print ("\n " + "=" * 50 )
315
+ print (f"🔄 Raw output 🔄" )
316
+ print ("- " * 25 )
317
+ print (node_chunks )
318
+ print ("=" * 50 )
319
+ final_result = {"content" : node_chunks }
320
+
321
+ # 최종 결과 반환
322
+ return final_result
0 commit comments