5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150 | class GraphDB:
"""
A class for interacting with a Neo4j graph database.
Parameters
----------
uri : str
The URI of the Neo4j database.
user : str
The username for authentication.
password : str
The password for authentication.
Attributes
----------
driver : neo4j.Driver
The Neo4j driver instance for managing database connections.
"""
def __init__(self, uri: str, user: str, password: str):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
def close(self):
"""
Close the connection to the Neo4j database.
Returns
-------
None
"""
self.driver.close()
def write_data(self, query: str, parameters: dict = None):
"""
Execute a write query on the Neo4j database.
Parameters
----------
query : str
The Cypher query to execute.
parameters : dict, optional
A dictionary of parameters to pass with the query.
Returns
-------
None
"""
with self.driver.session() as session:
session.write_transaction(self._execute_query, query, parameters)
def read_data(self, query: str, parameters: dict = None) -> list:
"""
Execute a read query on the Neo4j database and return the results.
Parameters
----------
query : str
The Cypher query to execute.
parameters : dict, optional
A dictionary of parameters to pass with the query.
Returns
-------
list
A list of records returned by the query.
"""
with self.driver.session() as session:
result = session.read_transaction(
self._execute_query, query, parameters)
return list(result)
@staticmethod
def _execute_query(tx, query: str, parameters: dict = None):
"""
Helper method to execute a query within a transaction.
Parameters
----------
tx : neo4j.Transaction
The transaction object.
query : str
The Cypher query to execute.
parameters : dict, optional
A dictionary of parameters to pass with the query.
Returns
-------
neo4j.Result
The result of the query execution.
"""
return tx.run(query, parameters)
def write_key_value(self, key: str, value: str):
"""
Execute a write query on the Neo4j database.
Parameters
----------
key : str
The key to be used to store the value
value : str
The value to be stored
Returns
-------
None
"""
with self.driver.session() as session:
query = """
MERGE (n:Data {key: $key})
SET n.value = $value
RETURN n
"""
result = session.run(query, key=key, value=value)
# Print the result (the created or updated node)
for record in result:
print(f"Upserted Node: {record['n']}")
def read_by_value(self, value: str) -> list:
"""
Execute a read on the Neo4j database to find nodes based on the value.
Parameters
----------
value : str
The value to search for in the database
Returns
-------
str
The key associated with the value if found, else None
"""
with self.driver.session() as session:
query = """
MATCH (n:Data {value: $value})
RETURN n.key AS key
"""
result = session.run(query, value=value)
# Return the key if the node is found, otherwise None
for record in result:
return record["key"]
# If no key is found for the value, return None
return None
|