libzypp  17.35.14
asyncqueue.cc
Go to the documentation of this file.
1 #include "private/asyncqueue_p.h"
2 #include <glib-unix.h>
3 #include <ostream>
4 
6 
7 namespace zyppng {
8 
10  { }
11 
13  {
14  std::lock_guard lock(_watchLock);
15  _watches.insert( &watch );
16  }
17 
19  {
20  std::lock_guard lock(_watchLock);
21  _watches.erase( &watch );
22  }
23 
25  {
26  std::lock_guard lock(_watchLock);
27  std::for_each( _watches.begin(), _watches.end(), []( AsyncQueueWatch *w ){
28  w->postNotifyEvent();
29  });
30  }
31 
33  , _queue( std::move(q) )
34  {
35  GError *error = NULL;
36 
37  if (!g_unix_open_pipe (fds, FD_CLOEXEC, &error))
38  ERR << "Creating pipes for AsyncQueueWatch: " << error->message << std::endl;
39 
40  if (!g_unix_set_fd_nonblocking (fds[0], TRUE, &error) ||
41  !g_unix_set_fd_nonblocking (fds[1], TRUE, &error))
42  ERR << "Set pipes non-blocking for AsyncQueueWatch: "<< error->message << std::endl;
43  }
44 
46  {
47  close (fds[0]);
48  close (fds[1]);
49  }
50 
52 
53  AsyncQueueWatch::AsyncQueueWatch(std::shared_ptr<zyppng::AsyncQueueBase> &&queue )
54  : AsyncQueueWatch( *( new AsyncQueueWatchPrivate( std::move(queue), *this ) ) )
55  { }
56 
58  : AbstractEventSource( dd )
59  { }
60 
61  std::shared_ptr<AsyncQueueWatch> AsyncQueueWatch::create( std::shared_ptr<AsyncQueueBase> queue )
62  {
63  std::shared_ptr<AsyncQueueWatch> ptr ( new AsyncQueueWatch( std::move(queue) ) );
64  auto d = ptr->d_func();
65  ptr->updateFdWatch( d->fds[0], AbstractEventSource::Read );
66  d->_queue->addWatch( *ptr );
67  return ptr;
68  }
69 
71  {
72  // sync point, since the queue locks all its watches before changing or notifying them
73  // we should never run into a bad situation where the AsyncQueueWatch is deleted while notified from a different thread.
74  // In case watches are notified this will block and the other way round
75  d_func()->_queue->removeWatch( *this );
76  }
77 
79  {
80  Z_D();
81  int res = -1;
82  guint8 one = 1;
83 
84  do {
85  errno = 0;
86  res = write (d->fds[1], &one, sizeof one);
87  } while (G_UNLIKELY (res == -1 && errno == EINTR));
88  }
89 
91  {
92  return d_func()->_sigMessageAvailable;
93  }
94 
95  void AsyncQueueWatch::onFdReady( int , int )
96  {
97  Z_D();
98  char buffer[16];
99 
100  /* read until it is empty */
101  while (read (d->fds[0], buffer, sizeof buffer) == sizeof buffer);
102  d->_sigMessageAvailable.emit();
103  }
104 
106  {
107  }
108 
109 
110 }
111 
void onFdReady(int fd, int events) override
Definition: asyncqueue.cc:95
void onSignal(int signal) override
Definition: asyncqueue.cc:105
Definition: Arch.h:363
void removeWatch(AsyncQueueWatch &watch)
Definition: asyncqueue.cc:18
#define ERR
Definition: Logger.h:102
#define Z_D()
Definition: zyppglobal.h:105
std::recursive_mutex _watchLock
Definition: asyncqueue.h:35
std::set< AsyncQueueWatch * > _watches
Definition: asyncqueue.h:34
SignalProxy< void()> sigMessageAvailable()
Definition: asyncqueue.cc:90
void addWatch(AsyncQueueWatch &watch)
Definition: asyncqueue.cc:12
virtual ~AsyncQueueBase()
Definition: asyncqueue.cc:9
~AsyncQueueWatch() override
Definition: asyncqueue.cc:70
static std::shared_ptr< AsyncQueueWatch > create(std::shared_ptr< AsyncQueueBase > queue)
Definition: asyncqueue.cc:61
AsyncQueueWatchPrivate(std::shared_ptr< AsyncQueueBase > &&q, AsyncQueueWatch &p)
Definition: asyncqueue.cc:32
ZYPP_IMPL_PRIVATE(UnixSignalSource)
std::map< std::string, std::string > read(const Pathname &_path)
Read sysconfig file path_r and return (key,valye) pairs.
Definition: sysconfig.cc:34
bool write(const Pathname &path_r, const std::string &key_r, const std::string &val_r, const std::string &newcomment_r)
Add or change a value in sysconfig file path_r.
Definition: sysconfig.cc:80
AsyncQueueWatch(std::shared_ptr< AsyncQueueBase > &&queue)