WPF는 기본적으로 비동기적인 UI 환경이기 때문에 System.Collections.Generic
에 있는 Queue<T>
자료구조를 이용하게 되면 의도와는 다르게 자료가 삽입되거나 빠져나올 수 있습니다.
이에 C#은 Systems.Collections.Concurrent
라는 패키지를 통해 비동기 처리에 대응하는 자료구조들을 제공하고 있는데요, 여기에 ConcurrentQueue<T>
라는 자료구조를 이용하면 됩니다.
하지만 제가 프로그램들을 개발할때는 단순히 ConcurrentQueue<T>
만 이용한다고 문제가 해결되진 않았는데요, 요는 기본적으로 Concurrent 패키지에 있는 자료구조들에는 blocking 알고리즘이 기본으로 제공되지 않아 자료구조 안에서 요소를 가져가려고 할 때 만약 해당 요소가 없다면 그게 자료구조에 들어올때까지 기다리지 않고 그냥 넘어가버리게 됩니다. 해당 패턴은 생산자-소비자 패턴이라고도 하는데요, 한 쪽은 큐에 자료들을 계속 집어넣고 다른 한 쪽은 큐에 자료가 들어있을때 그걸 계속 빼내는 형태를 말합니다.
따라서 해당 패턴을 적용하기 위해선 BlockingCollection<T> 라는 자료구조를 추가로 이용해야합니다.
BlockingCollection<T>는 기본적으로 Add()와 Take() 메소드를 제공하는데요, 여기에 Linq의 Any() 메소드까지 이용하면 쉽게 비동기 생산자-소비자 패턴을 적용할 수 있습니다.
아래에 작성한 코드는 제가 실제로 개발했던 WPF 프로그램의 일부를 축약해서 가져와본건데요, 다음과 같은 기능을 가지고 동작하는 형태입니다.
- 특정 버튼을 누르면 카메라 녹화가 시작된다 (StartRecording() 메소드)
- 카메라로부터 프레임 정보를 받을 때마다 어떤 서버로 데이터를 보내고, 결과값을 받아 큐에 넣는다 (Play() 메소드)
- 녹화가 진행되는 동안 1초 주기로 큐에 들어있는 결과값들을 꺼내 평균을 구하고 이를 UI에 표시한다 (EnqueueFERResult() 메소드)
1초 주기로 EnqueueFERResult() 메소드를 호출하기 위해 System.Threading
패키지에 있는 Timer 클래스를 이용하였는데요, 생성만 하면 바로 무한히 일정 주기로 동작하는 타이머를 아주 쉽게 만들 수 있어 애용하는 클래스입니다.
아무튼 여기서 생산자는 Play() 메소드이고 소비자는 EnqueueFERResult()가 되는데요, 서로 다른 쓰레드에서 비동기적으로 동작하는 상황인데 ConcurrentQueue에 기반한 BlockingCollection를 이용하여 비동기적인 상황에서도 쉽고 안전하게 큐를 비우는 프로그램을 만들 수 있습니다.
이때 큐가 비어있는지 확인하는 함수로 System.Linq
에 있는 확장메소드 Linq를 이용하였는데요, 사실 Any() 메소드는 성능이 좋은 편은 아니어서 TryTake()
를 이용하거나, foreach
와 BlockingCollection.GetConsumingEnumerable()
를 이용하는 것으로 좀더 간단하고 빠른 코드를 만들 수도 있습니다. 다만 저의 경우엔 두 가지 대안 모두 동작을 하지 않아 이런 방법을 이용하였으니 참고해주세요.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 |
using System.Collections.Concurrent; using System.Linq; using System.Threading; // 큐에 넣을 객체를 표현한 클래스 public class FERResult { public double arousal {get;set;} public double valence {get;set;} } // ConcurrentQueue에 기반한 BlockingCollection으로 만든 큐 private readonly BlockingCollection<FERResult> ferQueue = new BlockingCollection<FERResult>(new ConcurrentQueue<FERResult>()); private Timer ferTimer; // 1초 간격으로 ferQueue를 소비하는 Timer private void StartRecording() { if(ferTimer == null) { // 아래 생성자가 끝나는 시점부터 1초 주기로 EnqueFERResult() 메소드가 무한히 호출됨 ferTimer = new Timer(EnqueueFERResult, null, 0, 1000); } } // 어딘가에서 비동기적으로 무한히 호출되는 함수 private void Play() { // 어떤 서버로부터 FERResult 타입의 값을 받는다 var ferResult = ReceiveData(); ferQueue.Add(ferResult); } private void EnqueueFERResult(object state) { int count = 0; double arousalSum = 0; double valenceSum = 0; while(ferQueue.Any()) {// 큐가 비어있지 않으면 해당 시점에 들어있던 모든 요소를 꺼낸다 var result = ferQueue.Take(); count ++; arousalSum += result.arousal; valenceSum += result.valence; } if (count != 0) { // 1초 동안의 평균값을 구함 arousalSum /= count; valenceSum /= count; // WPF UI에 반영 Dispatcher.BeginInvoke(DispatcherPriority.Normal, new System.Action(() => { lbArousalMean1s.Content = arousalSum; lbValenceMean1s.Content = valenceSum; })); } } |