summaryrefslogtreecommitdiff
path: root/cache/src/object_store_client.cpp
blob: 58eb3fbde1bb55532206d67f25b9f3cedb572c42 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <net/if.h>
#include <unistd.h> 
#include <stdio.h> 
#include <stdlib.h>
#include <assert.h>
#include <errno.h>
#include <sys/prctl.h>
#include <string.h>
#include <pthread.h>

#include "object_store_client.h"
#include "tango_cache_tools.h"

enum CACHE_ERR_CODE object_store_get_last_error(const struct cache_evbase_ctx *ctx)
{
	return cache_evbase_get_last_error(ctx);
}

void object_store_get_statistics(const struct object_store_instance *instance, struct cache_statistics *out)
{
	struct cache_statistics out_cache;

	memset(out, 0, sizeof(struct cache_statistics));
	for(u_int32_t i=0; i<instance->instance_num; i++)
	{
		cache_evbase_get_statistics(instance->instances[i], &out_cache);
		
		out->del_error_num += out_cache.del_error_num;
		out->del_recv_num  += out_cache.del_recv_num;
		out->del_succ_num  += out_cache.del_succ_num;
		out->get_err_http  += out_cache.get_err_http;
		out->get_err_redis += out_cache.get_err_redis;
		out->get_miss_num  += out_cache.get_miss_num;
		out->get_recv_num  += out_cache.get_recv_num;
		out->get_succ_http += out_cache.get_succ_http;
		out->get_succ_redis+= out_cache.get_succ_redis;
		out->put_err_http  += out_cache.put_err_http;
		out->put_err_redis += out_cache.put_err_redis;
		out->put_recv_num  += out_cache.put_recv_num;
		out->put_succ_http += out_cache.put_succ_http;
		out->put_succ_redis+= out_cache.put_succ_redis;
		out->session_http  += out_cache.session_http;
		out->session_redis += out_cache.session_redis;
		out->memory_used   += out_cache.memory_used;
		out->totaldrop_num += out_cache.totaldrop_num;
	}
}

struct tango_cache_result *object_store_read_result(void *promise_result)
{
	return cache_evbase_read_result(promise_result);
}

int object_store_update_end(struct cache_evbase_ctx *ctx, char *path/*OUT*/, size_t pathsize)
{
	return cache_evbase_update_end(ctx, path, pathsize);
}

void object_store_update_cancel(struct cache_evbase_ctx *ctx)
{
	cache_evbase_update_cancel(ctx);
}

int object_store_update_frag_data(struct cache_evbase_ctx *ctx, enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size)
{
	return cache_evbase_update_frag_data(ctx, way, data, size);
}

int object_store_update_frag_evbuf(struct cache_evbase_ctx *ctx, struct evbuffer *evbuf)
{
	return cache_evbase_update_frag_evbuf(ctx, evbuf);
}

struct cache_evbase_ctx *object_store_update_start(struct object_store_instance *instance, struct future* f, struct tango_cache_meta_put *meta)
{
	return cache_evbase_update_start(instance->instances[rand()%instance->instance_num], f, meta);
}

int object_store_upload_once_data(struct object_store_instance *instance, struct future* f, 
		enum PUT_MEMORY_COPY_WAY way, const char *data, size_t size, struct tango_cache_meta_put *meta, char *path, size_t pathsize)
{
	return cache_evbase_upload_once_data(instance->instances[rand()%instance->instance_num], f, way, data, size, meta, path, pathsize);
}

int object_store_upload_once_evbuf(struct object_store_instance *instance, struct future* f, 
		struct evbuffer *evbuf, struct tango_cache_meta_put *meta, char *path, size_t pathsize)
{
	return cache_evbase_upload_once_evbuf(instance->instances[rand()%instance->instance_num], f, evbuf, meta, path, pathsize);
}

int object_store_fetch_object(struct object_store_instance *instance, struct future* f, struct tango_cache_meta_get *meta, enum OBJECT_LOCATION where_to_get)
{
	return cache_evbase_fetch_object(instance->instances[rand()%instance->instance_num], f, meta, where_to_get);
}

int object_store_head_object(struct object_store_instance *instance, struct future* f, struct tango_cache_meta_get *meta)
{
	return cache_evbase_head_object(instance->instances[rand()%instance->instance_num], f, meta);
}

int object_store_delete_object(struct object_store_instance *instance, struct future* f, const char *objkey, const char *minio_addr, const char *bucket)
{
	return cache_evbase_delete_object(instance->instances[rand()%instance->instance_num], f, objkey, minio_addr, bucket);
}

struct object_store_instance *object_store_instance_new(const char* profile_path, const char* section, int thread_num, void *runtimelog)
{
	struct object_store_instance *object_instance;
	struct tango_cache_parameter *parameter;

	parameter = tango_cache_parameter_new(profile_path, section, runtimelog);
	if(parameter == NULL)
	{
		return NULL;
	}

	object_instance = (struct object_store_instance *)calloc(1, sizeof(struct object_store_instance));
	object_instance->instance_num = thread_num;
	object_instance->instances = (struct cache_evbase_instance **)calloc(1, sizeof(struct cache_evbase_instance *)*object_instance->instance_num);

	for(u_int32_t i=0; i<object_instance->instance_num; i++)
	{
		object_instance->instances[i] = cache_evbase_instance_new(parameter, runtimelog);
		if(object_instance->instances[i] == NULL)
		{
			free(parameter);
			free(object_instance);
			return NULL;
		}
	}
	srandom(time(NULL));
	return object_instance;
}

void object_store_global_init(void)
{
	cache_evbase_global_init();
}