summaryrefslogtreecommitdiff
path: root/adapters/glib.h
blob: e13eee73beaed592026d8236e3b3b0496b2f825f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#ifndef __HIREDIS_GLIB_H__
#define __HIREDIS_GLIB_H__

#include <glib.h>

#include "../hiredis.h"
#include "../async.h"

typedef struct
{
    GSource source;
    redisAsyncContext *ac;
    GPollFD poll_fd;
} RedisSource;

static void
redis_source_add_read (gpointer data)
{
    RedisSource *source = data;
    g_return_if_fail(source);
    source->poll_fd.events |= G_IO_IN;
    g_main_context_wakeup(g_source_get_context(data));
}

static void
redis_source_del_read (gpointer data)
{
    RedisSource *source = data;
    g_return_if_fail(source);
    source->poll_fd.events &= ~G_IO_IN;
    g_main_context_wakeup(g_source_get_context(data));
}

static void
redis_source_add_write (gpointer data)
{
    RedisSource *source = data;
    g_return_if_fail(source);
    source->poll_fd.events |= G_IO_OUT;
    g_main_context_wakeup(g_source_get_context(data));
}

static void
redis_source_del_write (gpointer data)
{
    RedisSource *source = data;
    g_return_if_fail(source);
    source->poll_fd.events &= ~G_IO_OUT;
    g_main_context_wakeup(g_source_get_context(data));
}

static void
redis_source_cleanup (gpointer data)
{
    RedisSource *source = data;

    g_return_if_fail(source);

    redis_source_del_read(source);
    redis_source_del_write(source);
    /*
     * It is not our responsibility to remove ourself from the
     * current main loop. However, we will remove the GPollFD.
     */
    if (source->poll_fd.fd >= 0) {
        g_source_remove_poll(data, &source->poll_fd);
        source->poll_fd.fd = -1;
    }
}

static gboolean
redis_source_prepare (GSource *source,
                      gint    *timeout_)
{
    RedisSource *redis = (RedisSource *)source;
    *timeout_ = -1;
    return !!(redis->poll_fd.events & redis->poll_fd.revents);
}

static gboolean
redis_source_check (GSource *source)
{
    RedisSource *redis = (RedisSource *)source;
    return !!(redis->poll_fd.events & redis->poll_fd.revents);
}

static gboolean
redis_source_dispatch (GSource      *source,
                       GSourceFunc   callback,
                       gpointer      user_data)
{
    RedisSource *redis = (RedisSource *)source;

    if ((redis->poll_fd.revents & G_IO_OUT)) {
        redisAsyncHandleWrite(redis->ac);
        redis->poll_fd.revents &= ~G_IO_OUT;
    }

    if ((redis->poll_fd.revents & G_IO_IN)) {
        redisAsyncHandleRead(redis->ac);
        redis->poll_fd.revents &= ~G_IO_IN;
    }

    if (callback) {
        return callback(user_data);
    }

    return TRUE;
}

static void
redis_source_finalize (GSource *source)
{
    RedisSource *redis = (RedisSource *)source;

    if (redis->poll_fd.fd >= 0) {
        g_source_remove_poll(source, &redis->poll_fd);
        redis->poll_fd.fd = -1;
    }
}

static GSource *
redis_source_new (redisAsyncContext *ac)
{
    static GSourceFuncs source_funcs = {
        .prepare  = redis_source_prepare,
        .check     = redis_source_check,
        .dispatch = redis_source_dispatch,
        .finalize = redis_source_finalize,
    };
    redisContext *c = &ac->c;
    RedisSource *source;

    g_return_val_if_fail(ac != NULL, NULL);

    source = (RedisSource *)g_source_new(&source_funcs, sizeof *source);
    source->ac = ac;
    source->poll_fd.fd = c->fd;
    source->poll_fd.events = 0;
    source->poll_fd.revents = 0;
    g_source_add_poll((GSource *)source, &source->poll_fd);

    ac->ev.addRead = redis_source_add_read;
    ac->ev.delRead = redis_source_del_read;
    ac->ev.addWrite = redis_source_add_write;
    ac->ev.delWrite = redis_source_del_write;
    ac->ev.cleanup = redis_source_cleanup;
    ac->ev.data = source;

    return (GSource *)source;
}

#endif /* __HIREDIS_GLIB_H__ */